diff -up --recursive 2.6.11.3.clean/drivers/md/Kconfig 2.6.11.3/drivers/md/Kconfig
--- 2.6.11.3.clean/drivers/md/Kconfig	2005-03-13 01:44:06.000000000 -0500
+++ 2.6.11.3/drivers/md/Kconfig	2005-04-10 03:07:29.000000000 -0400
@@ -227,5 +227,13 @@ config DM_ZERO
 	  A target that discards writes, and returns all zeroes for
 	  reads.  Useful in some recovery situations.
 
+config DM_DDRAID
+	tristate "Distributed Data RAID target (EXPERIMENTAL)"
+	depends on BLK_DEV_DM && EXPERIMENTAL
+	---help---
+	  This device-mapper target allows you to join together several
+	  network and/or local block devices into a raid 3.5 array.
+	  If unsure, say N.
+
 endmenu
 
diff -up --recursive 2.6.11.3.clean/drivers/md/Makefile 2.6.11.3/drivers/md/Makefile
--- 2.6.11.3.clean/drivers/md/Makefile	2005-03-13 01:44:27.000000000 -0500
+++ 2.6.11.3/drivers/md/Makefile	2005-05-15 22:17:42.000000000 -0400
@@ -33,6 +33,9 @@ obj-$(CONFIG_DM_CRYPT)		+= dm-crypt.o
 obj-$(CONFIG_DM_SNAPSHOT)	+= dm-snapshot.o
 obj-$(CONFIG_DM_MIRROR)		+= dm-mirror.o
 obj-$(CONFIG_DM_ZERO)		+= dm-zero.o
+obj-$(CONFIG_DM_DDRAID)		+= dm-ddraid.o
+obj-$(CONFIG_DM_DDRAID)		+= dm-ddsnap.o
+obj-$(CONFIG_DM_DDRAID)		+= dm-loop.o
 
 quiet_cmd_unroll = UNROLL  $@
       cmd_unroll = $(PERL) $(srctree)/$(src)/unroll.pl $(UNROLL) \
diff -up --recursive 2.6.11.3.clean/drivers/md/dm-ddraid.c 2.6.11.3/drivers/md/dm-ddraid.c
--- 2.6.11.3.clean/drivers/md/dm-ddraid.c	2005-03-30 00:59:53.000000000 -0500
+++ 2.6.11.3/drivers/md/dm-ddraid.c	2005-05-31 21:49:52.000000000 -0400
@@ -0,0 +1,1619 @@
+#include <linux/fs.h>
+#include <linux/slab.h>
+#include <linux/mm.h>
+#include <linux/module.h>
+#include <linux/pagemap.h>
+#include <linux/file.h>
+#include <linux/syscalls.h> // recvmsg
+#include <linux/socket.h>
+#include <linux/un.h>
+#include <net/sock.h>
+#include <asm/uaccess.h>
+#include <linux/bio.h>
+#include "dm.h"
+#include "dm-ddraid.h"
+
+#define warn(string, args...) do { printk("%s: " string "\n", __func__, ##args); } while (0)
+#define error(string, args...) do { warn(string, ##args); BUG(); } while (0)
+#define assert(expr) do { if (!(expr)) error("Assertion " #expr " failed!\n"); } while (0)
+#define trace_on(args) args
+#define trace_off(args)
+
+#define trace trace_off
+#define tracebio trace_off
+#define DDRAID
+#define NORAID 0
+#define NOCALC 1
+#define NOSYNC 1
+
+/*
+ * To do:
+ *  - accept highwater updates
+ *  - handle IO failures
+ *  - download/upload region dirty list distributions (faster failover)
+ *  - some sane approach to read balancing so user space can specify policy
+ */
+
+static int transfer(struct file *file, const void *buffer, unsigned int count,
+	ssize_t (*op)(struct kiocb *, const char *, size_t, loff_t), int mode)
+{
+	struct kiocb iocb;
+	mm_segment_t oldseg = get_fs();
+	int err = 0;
+
+	trace_off(warn("%s %i bytes", mode == FMODE_READ? "read": "write", count);)
+	if (!(file->f_mode & mode))
+		return -EBADF;
+	if (!op)
+		return -EINVAL;
+	init_sync_kiocb(&iocb, file); // new in 2.5 (hmm)
+	iocb.ki_pos = file->f_pos;
+	set_fs(get_ds());
+	while (count) {
+		int chunk = (*op)(&iocb, buffer, count, iocb.ki_pos);
+		if (chunk <= 0) {
+			err = chunk? chunk: -EPIPE;
+			break;
+		}
+		BUG_ON(chunk > count);
+		count -= chunk;
+		buffer += chunk;
+	}
+	set_fs(oldseg);
+	file->f_pos = iocb.ki_pos;
+	return err;
+}
+
+static inline int readpipe(struct file *file, void *buffer, unsigned int count)
+{
+	return transfer(file, buffer, count, (void *)file->f_op->aio_read, FMODE_READ);
+}
+
+static inline int writepipe(struct file *file, void *buffer, unsigned int count)
+{
+	return transfer(file, buffer, count, file->f_op->aio_write, FMODE_WRITE);
+}
+
+#define outbead(SOCK, CODE, STRUCT, VALUES...) ({ \
+	struct { struct head head; STRUCT body; } PACKED message = \
+		{ { CODE, sizeof(STRUCT) }, { VALUES } }; \
+	writepipe(SOCK, &message, sizeof(message)); })
+
+static int recv_fd(int sock, char *bogus, unsigned *len)
+{
+	char payload[CMSG_SPACE(sizeof(int))];
+	struct msghdr msg = {
+		.msg_control = payload,
+		.msg_controllen = sizeof(payload),
+		.msg_iov = &(struct iovec){ .iov_base = bogus, .iov_len = *len },
+		.msg_iovlen = 1,
+	};
+	mm_segment_t oldseg = get_fs();
+	struct cmsghdr *cmsg;
+	int result;
+
+	set_fs(get_ds());
+	result = sys_recvmsg(sock, &msg, 0);
+	set_fs(oldseg);
+
+	if (result <= 0)
+		return result;
+	if (!(cmsg = CMSG_FIRSTHDR(&msg)))
+		return -ENODATA;
+	if (cmsg->cmsg_len != CMSG_LEN(sizeof(int)) ||
+		cmsg->cmsg_level != SOL_SOCKET ||
+		cmsg->cmsg_type != SCM_RIGHTS)
+		return -EBADMSG;
+
+	*len = result;
+	return *((int *)CMSG_DATA(cmsg));
+}
+
+static void submit_bdev(struct bio *bio, struct block_device *bdev)
+{
+	bio->bi_bdev = bdev;
+	generic_make_request(bio);
+}
+
+static inline long IS_ERRNO(const void *ptr)
+{
+	return unlikely(IS_ERR(ptr))? (unsigned long)ptr: 0;
+}
+
+#if 0
+static void kick(struct block_device *dev)
+{
+	request_queue_t *q = bdev_get_queue(dev);
+	if (q->unplug_fn)
+		q->unplug_fn(q);
+}
+#endif
+
+static void pagecopy(struct page *sp, unsigned os, struct page *dp, unsigned od, unsigned n)
+{
+	void *s = kmap_atomic(sp, KM_USER0);
+	void *d = kmap_atomic(dp, KM_USER1);
+	memcpy(d + od, s + os, n);
+	kunmap_atomic(s, KM_USER0);
+	kunmap_atomic(d, KM_USER1);
+}
+
+static inline void hexdump(void *data, unsigned length)
+{
+	while (length ) {
+		int row = length < 16? length: 16;
+		printk("%p: ", data);
+		length -= row;
+		while (row--)
+			printk("%02hx ", *(unsigned char *)data++);
+		printk("\n");
+	}
+}
+
+/*
+ * Bio stacking hack.
+ *
+ * A block device is essentially a stack of virtualization layers, where
+ * each layer is a virtual device, or at the bottom of the stack, a real
+ * device.  Each layer has a driver that receives the bio and either
+ * relays it to the next layer or handles it in some other way, perhaps
+ * by creating one or more new bios, submitting those and arranging to
+ * signal completion of the original bio when all the "stacked" bios have
+ * completed.  So we have two ways of passing a bio from one layer of the
+ * device stack to another: relaying and stacking.  In the relay case, the
+ * sector and/or device fields may be rewritten by the underlying driver,
+ * and therefore the submitter may not rely on either fields after
+ * submitting the bio.  Consequently, if the underlying driver does not
+ * relay the bio but services it by other means, such as stacking, the
+ * underlying driver owns these two fields until it signals completion.
+ * This is convenient, since a stacking driver needs some way to find the
+ * original bio when the underlying bios complete, and may need other
+ * working storage as well.
+ *
+ * To provide some semblance of type safety, I provide inline wrappers to
+ * alias the two fields as an atomic count and a void * pointer respectively.
+ * This assumes that an atomic count will always fit in the bdev field
+ * (hashed locking was adopted for atomic fields for one architecture that
+ * lacked a native atomic type) and that a pointer will always fit into the
+ * sector field.  The driver must take care not to set these aliased fields
+ * before it has retrieved the original contents.
+ *
+ * My original approach to stacking a bio was to hook the private field and
+ * restore it on completion, making it unavailable to the true owner while
+ * the bio is in flight.  This seemed a little risky.
+ */
+
+static inline atomic_t *bio_hackcount(struct bio *bio)
+{
+	return (atomic_t *)&bio->bi_bdev;
+}
+
+static inline int *bio_hacklong(struct bio *bio)
+{
+	return (int *)&bio->bi_bdev;
+}
+
+static inline void **bio_hackhook(struct bio *bio)
+{
+	return (void **)&bio->bi_sector;
+}
+
+typedef u64 chunk_t;
+
+#define SECTOR_SHIFT 9
+#define FINISH_FLAG 4
+#define HASH_BUCKETS 64
+#define MASK_BUCKETS (HASH_BUCKETS - 1)
+#define MAX_MEMBERS 10
+
+#ifdef DDRAID
+#  define is_ddraid 1
+#else
+#  define is_ddraid 0
+#endif
+
+struct devinfo {
+	unsigned flags;
+	unsigned region_size_bits;
+#ifdef DDRAID
+	int blocksize_bits, fragsize_bits;
+#endif
+	struct dm_dev *member[MAX_MEMBERS];
+	unsigned members;
+	struct file *sock;
+	struct file *control_socket;
+	struct semaphore server_in_sem;
+	struct semaphore server_out_sem;
+	struct semaphore more_work_sem;
+	struct semaphore destroy_sem;
+	struct semaphore exit1_sem;
+	struct semaphore exit2_sem;
+	struct semaphore exit3_sem;
+	struct list_head hash[HASH_BUCKETS];
+	struct list_head requests;
+	struct list_head releases;
+	struct list_head bogus;
+	struct region *spare_region;
+	spinlock_t region_lock;
+	spinlock_t endio_lock;
+	atomic_t destroy_hold;
+	region_t highwater;
+	unsigned balance_acc, balance_num, balance_den, balance;
+	int dead;
+};
+
+static inline int running(struct devinfo *info)
+{
+	return !(info->flags & FINISH_FLAG);
+}
+
+static inline int frags_per_block_bits(struct devinfo *info)
+{
+	return info->blocksize_bits - info->fragsize_bits;
+}
+
+static inline int blocksize(struct devinfo *info)
+{
+	return 1 << info->blocksize_bits;
+}
+
+/*
+ * SMP Locking notes:
+ *
+ * endio_lock protects:
+ *   - only the retire queue
+ *
+ * region_lock protects:
+ *   - region hash list
+ *   - region desync and drain bits
+ *   - incrementing region count
+ *
+ * Decrementing region->count is not protected by region_lock so that region_lock
+ * does not have to disable irqs.  This is safe because only the zero state is
+ * meaningful outside interrupt context, and once zero is reached there will be
+ * no more racy decrements.
+ *
+ * These locks are never nested.
+ */
+
+/* Region hash records both dirty and desynced regions */
+struct region {
+	atomic_t count;
+	unsigned flags;
+	region_t regnum;
+	struct list_head hash;
+	struct list_head wait;
+};
+
+/* Gizmo union eliminates a few nasty allocations */
+struct defer { struct list_head list; struct bio *bio; };
+struct query { struct list_head list; region_t regnum; };
+
+struct hook {
+	sector_t sector; // debug trace
+	unsigned length; // debug trace
+	struct devinfo *info;
+	struct region *region;
+	struct bio *parity; };
+
+struct retire {
+	struct list_head list;
+	struct devinfo *info;
+	struct region *region;
+	struct timer_list *timer; };
+
+union gizmo {
+	struct defer defer;
+	struct query query;
+	struct retire retire;
+	struct hook hook; };
+
+static kmem_cache_t *gizmo_cache;
+
+static void *alloc_gizmo(void)
+{
+	return kmem_cache_alloc(gizmo_cache, GFP_NOIO|__GFP_NOFAIL);
+}
+
+#ifdef DDRAID
+typedef unsigned long long xor_t;
+#define S4K2 (4096 / (2*sizeof(xor_t)))
+#define S4K4 (4096 / (4*sizeof(xor_t)))
+#define S4K8 (4096 / (8*sizeof(xor_t)))
+#define S4K16 (4096 / (16*sizeof(xor_t)))
+
+static void compute_parity(struct devinfo *info, xor_t *v, xor_t *p)
+{
+	int fragsize = 1 << info->fragsize_bits;
+	int frags = info->members - 1;
+	int stride = fragsize / sizeof(xor_t);
+	xor_t *limit = p + stride;
+#if 1 /* doesn't seem to help much */
+	switch (blocksize(info) == 4096? frags: 0) {
+#if 0
+	case 1:
+		warn(">>>optimize for mirror");
+		memcpy(p, v, fragsize);
+#endif
+	case 2:
+		for (; p < limit; p += 4, v += 4) {
+			*(p + 0) = *(v + 0) ^ *(v + 0 + S4K2);
+			*(p + 1) = *(v + 1) ^ *(v + 1 + S4K2);
+			*(p + 2) = *(v + 2) ^ *(v + 2 + S4K2);
+			*(p + 3) = *(v + 3) ^ *(v + 3 + S4K2);
+		}
+		return;
+	case 4:
+		for (; p < limit; v++)
+			*p++ =	*(v + 0*S4K4) ^ *(v + 1*S4K4) ^ *(v + 2*S4K4) ^ *(v + 3*S4K4);
+		return;
+	case 8:
+		for (; p < limit; v++)
+			*p++ =	*(v + 0*S4K8) ^ *(v + 1*S4K8) ^ *(v + 2*S4K8) ^ *(v + 3*S4K8) ^
+				*(v + 4*S4K8) ^ *(v + 5*S4K8) ^ *(v + 6*S4K8) ^ *(v + 7*S4K8);
+		return;
+	case 16:
+		for (; p < limit; v++)
+			*p++ =	*(v + 0*S4K16) ^ *(v + 1*S4K16) ^ *(v + 2*S4K16) ^ *(v + 3*S4K16) ^
+				*(v + 4*S4K16) ^ *(v + 5*S4K16) ^ *(v + 6*S4K16) ^ *(v + 7*S4K16) ^
+				*(v + 8*S4K16) ^ *(v + 9*S4K16) ^ *(v + 10*S4K16) ^ *(v + 11*S4K16) ^
+				*(v + 12*S4K16) ^ *(v + 13*S4K16) ^ *(v + 14*S4K16) ^ *(v + 15*S4K16);
+		return;
+	}
+#endif
+	while (p < limit) {
+		int n = frags - 1;
+		xor_t x = *v, *q = v;
+	
+		while (n--)
+			x ^= *(q += stride);
+		*p++ = x;
+		v++;
+	}
+}
+
+static int verify_parity(struct devinfo *info, xor_t *v, xor_t *p)
+{
+	unsigned frags = info->members - 1;
+	unsigned stride = (1 << info->fragsize_bits) / sizeof(xor_t);
+	xor_t *limit = p + stride;
+
+	while (p < limit) {
+		int n = frags - 1;
+		xor_t x = *v, *q = v;
+
+		while (n--)
+			x ^= *(q += stride);
+		if (*p++ ^ x)
+			return -1;
+		v++;
+	}
+	return 0;
+}
+#endif
+
+/*
+ * Life cycle of a raid write request:
+ *
+ * A write request arrives in _map, then if it can't be handled immediately,
+ * it goes to the work daemon, hooked onto a struct region by a struct defer,
+ * which emits the write request message.  The incoming daemon receives the
+ * response, finds the region with the defer list in the hash, and submits
+ * any defered bio requests.  The bio completion has to be hooked in order to
+ * keep track of writes in progress, by linking a struct hook into the bio's
+ * private field to store the old completion and private fields so they can
+ * be restored after our own completion handler runs.  The completion
+ * handler runs in interrupt context, so when the final active write on a
+ * region completes, this has to be communicated to a daemon that can send
+ * the release message by linking a struct retire onto the raid releases
+ * list.  The work daemon picks up the retires, checks the region status
+ * under a lock to be sure no new io came along in the meantime, and if
+ * not, emits the release message and removes the region from the hash,
+ * unless it's an unsynced region below the sync highwater mark, in which
+ * case it stays, so that readers can find out about unsynced regions by
+ * looking in the region hash.
+ */
+
+#define DESYNC_FLAG 1
+#define DRAIN_FLAG 2
+#define PAUSE_FLAG 4
+
+static inline unsigned hash_region(region_t value)
+{
+	return value & MASK_BUCKETS;
+}
+
+static inline void get_region(struct region *region)
+{
+	atomic_inc(&region->count);
+}
+
+static inline int put_region_test_zero(struct region *region)
+{
+	return atomic_dec_and_test(&region->count);
+}
+
+static inline int region_count(struct region *region)
+{
+	return atomic_read(&region->count);
+}
+
+static inline void set_region_count(struct region *region, int value)
+{
+	atomic_set(&region->count, value);
+}
+
+static inline int is_desynced(struct region *region)
+{
+	return region->flags & DESYNC_FLAG;
+}
+
+static inline int drain_region(struct region *region)
+{
+	return (region->flags & DRAIN_FLAG);
+}
+
+static inline void _show_regions(struct devinfo *info)
+{
+	unsigned i, regions = 0, defered = 0;
+
+	spin_lock(&info->region_lock);
+	for (i = 0; i < HASH_BUCKETS; i++) {
+		struct list_head *list;
+		list_for_each(list, info->hash + i) {
+			struct region *region = list_entry(list, struct region, hash);
+			struct list_head *wait;
+			printk(is_desynced(region)? "~": "");
+			printk("%Lx/%i ", (long long)region->regnum, region_count(region));
+			list_for_each(wait, &region->wait) {
+				struct defer *defer = list_entry(wait, struct defer, list);
+				printk("<%Lx> ", (long long)(defer->bio? defer->bio->bi_sector: -1));
+				defered++;
+			}
+			regions++;
+		}
+	}
+	printk("(%u/%u)\n", regions, defered);
+	spin_unlock(&info->region_lock);
+}
+
+#define show_regions(info) do { warn("regions:"); _show_regions(info); } while (0)
+
+static struct region *find_region(struct devinfo *info, region_t regnum)
+{
+	struct list_head *list, *bucket = info->hash + hash_region(regnum);
+	struct region *region;
+
+	list_for_each(list, bucket)
+		if ((region = list_entry(list, struct region, hash))->regnum == regnum)
+			goto found;
+	trace(warn("No cached region %Lx", (long long)regnum);)
+	return NULL;
+found:
+	trace(warn("Found region %Lx", (long long)regnum);)
+	return region;
+}
+
+static void insert_region(struct devinfo *info, struct region *region)
+{
+	INIT_LIST_HEAD(&region->wait);
+	list_add(&region->hash, info->hash + hash_region(region->regnum));
+}
+
+static kmem_cache_t *region_cache;
+
+static struct region *alloc_region(void)
+{
+	return kmem_cache_alloc(region_cache, GFP_NOIO|__GFP_NOFAIL);
+}
+
+static void free_region_unlock(struct devinfo *info, struct region *region)
+{
+	list_del(&region->hash);
+	spin_unlock(&info->region_lock);
+	kmem_cache_free(region_cache, region);
+}
+
+static void queue_request_lock(struct devinfo *info, region_t regnum)
+{
+	struct query *query = alloc_gizmo();
+	*query = (struct query){ .regnum = regnum };
+	spin_lock(&info->region_lock);
+	list_add_tail(&query->list, &info->requests);
+	up(&info->more_work_sem);
+}
+
+static void queue_request(struct devinfo *info, region_t regnum)
+{
+	queue_request_lock(info, regnum);
+	spin_unlock(&info->region_lock);
+}
+
+static void send_release(struct devinfo *info, region_t regnum)
+{
+	down(&info->server_out_sem);
+	outbead(info->sock, RELEASE_WRITE, struct region_message, .regnum = regnum);
+	up(&info->server_out_sem);
+}
+
+static void release_region_unlock(struct devinfo *info, struct region *region)
+{
+	region_t regnum = region->regnum;
+	trace(warn("release region %Lx", (long long)regnum);)
+
+	if (!list_empty(&region->wait)) {
+		if (!drain_region(region))
+			warn("requests leaked!");
+		region->flags &= ~DRAIN_FLAG;
+		atomic_set(&region->count, -1);
+		spin_unlock(&info->region_lock);
+		send_release(info, regnum);
+		queue_request(info, region->regnum);
+		return;
+	}
+
+	/* keep desynced regions for reader cache */
+	if (is_desynced(region) && region->regnum < info->highwater) {
+		atomic_set(&region->count, -2);
+		spin_unlock(&info->region_lock);
+		return;
+	}
+
+	free_region_unlock(info, region);
+	send_release(info, regnum);
+}
+
+static inline char *strio(int is_read)
+{
+	return is_read? "read": "write";
+}
+
+/* interrupt context */
+
+static void queue_release(struct retire *retire)
+{
+	struct devinfo *info = retire->info;
+	trace(warn("queue region %Lx for release", (long long)retire->region->regnum);)
+	spin_lock(&info->endio_lock);
+	list_add_tail(&retire->list, &info->releases);
+	spin_unlock(&info->endio_lock);
+	up(&info->more_work_sem);
+}
+
+static void free_bio_pages(struct bio *bio, int stride)
+{
+	int vec;
+	for (vec = 0; vec < bio->bi_vcnt; vec += stride)
+		__free_page(bio->bi_io_vec[vec].bv_page);
+}
+
+/*
+ * Delayed release.
+ *
+ * When there are no more in-flight writes to a given region, we release
+ * the region so that the server can mark it clean in the persistent dirty
+ * log.  However, if we do this immediately then back-to-back writes will
+ * suffer horribly.  So we need to delay the release a little.  A timer
+ * struct is allocated and freed each time a region looks like it may be
+ * released, and the actual decision to release is made later in the worker
+ * thread.  So there tends to be an annoying extra allocate/release on every
+ * back to back write.  This can probably be changed to a single timer
+ * embedded in the region struct, since only one delayed release can be in
+ * flight per region.  Probably.
+ */
+static void timer_release(unsigned long data)
+{
+	queue_release((struct retire *)data);
+}
+
+static int clone_endio(struct bio *bio, unsigned int done, int error)
+{
+	struct bio *parent = bio->bi_private;
+	tracebio(warn("%p, parent count = %i", bio, atomic_read(bio_hackcount(parent)));)
+	if (atomic_dec_and_test(bio_hackcount(parent))) {
+		struct hook *hook = *bio_hackhook(parent);
+		if (hook) {
+			struct devinfo *info = hook->info;
+			struct bio *parity = hook->parity;
+			if (parity) {
+				tracebio(warn("free parity");)
+				free_bio_pages(parity, 1 << frags_per_block_bits(info));
+				bio_put(parity);
+			}
+			tracebio(warn("free hook");)
+			kmem_cache_free(gizmo_cache, hook);
+		}
+		tracebio(warn("release parent");)
+		bio_endio(parent, parent->bi_size, error);
+	}
+	bio_put(bio);
+	return 0;
+}
+
+static int bounce_read_endio(struct bio *bounce, unsigned int done, int error)
+{
+	struct bio *parent = bounce->bi_private;
+	void *bp = bounce->bi_io_vec[0].bv_page;
+	void *pp = parent->bi_io_vec[0].bv_page;
+	unsigned o = *bio_hacklong(parent);
+
+	tracebio(warn("copy to bounce %p+%x", bounce, o);) 
+	pagecopy(bp, o, pp, parent->bi_io_vec[0].bv_offset, parent->bi_size); // !!! what about error
+	flush_dcache_page(pp);
+	__free_page(bp);
+	bio_endio(parent, parent->bi_size, error);
+	bio_put(bounce);
+	return 0;
+}
+
+static int clone_write_endio(struct bio *bio, unsigned int done, int error)
+{
+	struct bio *parent = bio->bi_private;
+
+	tracebio(warn("%p, parent count = %i", bio, atomic_read(bio_hackcount(parent)));)
+	if (atomic_dec_and_test(bio_hackcount(parent))) {
+		struct hook *hook = *bio_hackhook(parent);
+		struct devinfo *info = hook->info;
+		struct region *region = hook->region;
+		struct bio *parity = hook->parity;
+
+		trace(warn("parent end io");)
+		if (put_region_test_zero(region)) {
+			*(struct retire *)hook = (struct retire){ .info = info, .region = region };
+			if (1) {
+				struct timer_list *timer = kmalloc(sizeof(struct timer_list), GFP_ATOMIC);
+				get_region(region);
+				trace(warn("delay region %Lx release, count = %i", (long long)region->regnum, region_count(region));)
+				init_timer(timer);
+				timer->function = timer_release;
+				timer->expires = jiffies + HZ;
+				timer->data = (unsigned long)hook;
+				((struct retire *)hook)->timer = timer;
+				add_timer(timer);
+				if (atomic_add_return(1, &info->destroy_hold) == 1)
+					down(&info->destroy_sem);
+			} else
+				queue_release((struct retire *)hook);
+		} else
+			kmem_cache_free(gizmo_cache, hook);
+		bio_endio(parent, parent->bi_size, error); /* after destroy_hold inc */
+
+		if (parity) {
+			tracebio(warn("put bio, count = %i", atomic_read(&parity->bi_cnt));)
+			free_bio_pages(parity, 1 << frags_per_block_bits(info));
+			bio_put(parity);
+		}
+	}
+	tracebio(warn("put bio, count = %i", atomic_read(&bio->bi_cnt));)
+	bio_put(bio);
+	return 0;
+}
+
+/*
+ * Reconstruction: Let's do a nasty trick.  Copy the parity to the
+ * missing fragment, then compute_parity with the same fragment as
+ * destination, overwriting the parity with the reconstructed data.
+ */
+static int clone_read_endio(struct bio *bio, unsigned int done, int error)
+{
+	struct bio *parent = bio->bi_private;
+
+	tracebio(warn("%p, parent count = %i", bio, atomic_read(bio_hackcount(parent)));)
+	if (atomic_dec_and_test(bio_hackcount(parent))) {
+		struct hook *hook = *bio_hackhook(parent);
+		struct bio *parity = hook->parity;
+		trace(warn("parent end io");)
+
+		if (parity) {
+			struct devinfo *info = hook->info;
+
+			if (!NOCALC) {
+				int vec;
+				for (vec = 0; vec < bio->bi_vcnt; vec++) {
+					struct page *spage = parent->bi_io_vec[vec].bv_page;
+					struct page *ppage = parity->bi_io_vec[vec].bv_page;
+					void *s = kmap_atomic(spage, KM_USER0);
+					void *p = kmap_atomic(ppage, KM_USER1);
+					int mask = ~PAGE_CACHE_MASK;
+					int offset = (vec << info->fragsize_bits) & mask;
+					int dead = info->dead;
+
+					if (dead >= 0) {
+						void *d = s + (dead << info->fragsize_bits);
+						memcpy(d, p + offset, 1 << info->fragsize_bits);
+						compute_parity(info, s, d);
+						flush_dcache_page(ppage);
+					} else {
+						if (verify_parity(info, s, p + offset))
+							warn("Parity check failed, bio=%Lx/%x", (long long)hook->sector, hook->length);
+					}
+					kunmap_atomic(s, KM_USER0);
+					kunmap_atomic(p, KM_USER1);
+				}
+			}
+			free_bio_pages(parity, 1 << frags_per_block_bits(info));
+			trace_off(warn("put parity bio, count = %i", atomic_read(&parity->bi_cnt));)
+			bio_put(parity);
+		}
+		bio_endio(parent, parent->bi_size, error);
+		kmem_cache_free(gizmo_cache, hook);
+	}
+	trace_off(warn("put bio, count = %i", atomic_read(&bio->bi_cnt));)
+	bio_put(bio);
+	return 0;
+}
+
+/*
+ * Degraded mode:
+ * Lost parity disk: don't submit/check parity bio
+ * Lost data disk, write: don't submit bio for missing disk
+ * Lost data disk, read: reconstruct missing frag as xor of others
+ */
+static int submit_rw(struct devinfo *info, struct bio *bio, int synced, struct hook *hook, bio_end_io_t endio)
+{
+#ifdef DDRAID
+	int vec, vecs = bio->bi_vcnt;
+	int disk, disks = info->members, dead = info->dead;
+	int is_read = bio_data_dir(bio) == READ;
+	int need_hook = 1; // !!! don't need hook if parity dead
+	int fragsize = 1 << info->fragsize_bits;
+	int mask = ~PAGE_CACHE_MASK; // !!! assume blocksize = pagesize for now
+	int err = 0;
+	sector_t sector = bio->bi_sector; // hackhook trashes bi_sector
+
+	tracebio(warn("submit %i clones, size = %x, vecs = %i", disks, fragsize, vecs);)
+	atomic_set(bio_hackcount(bio), disks - (dead >= 0));
+
+	if (need_hook) {
+		if (!hook) {
+			hook = alloc_gizmo();
+			*hook = (struct hook){ .info = info };
+		}
+		hook->sector = sector; // debug only
+		hook->length = bio->bi_size; // debug only
+		*bio_hackhook(bio) = hook;
+	}
+
+	for (disk = 0; disk < disks; disk++) {
+		int is_parity = (disk == disks - 1);
+		struct page *parity_page = NULL;
+		struct bio *clone;
+
+		if (disk == dead)
+			continue;
+
+		clone = bio_alloc(GFP_NOIO, vecs);
+		clone->bi_rw = bio->bi_rw;
+		clone->bi_bdev = (info->member[disk])->bdev;
+		clone->bi_sector = sector >> frags_per_block_bits(info);
+		clone->bi_vcnt = vecs;
+		clone->bi_size = vecs << info->fragsize_bits;
+		clone->bi_private = bio;
+		clone->bi_end_io = endio;
+
+		if (is_parity) {
+			hook->parity = clone;
+			bio_get(clone);
+		}
+
+		for (vec = 0; vec < vecs; vec++) {
+			struct page *spage = bio->bi_io_vec[vec].bv_page;
+			unsigned offset;
+
+			if (!is_parity) {
+				clone->bi_io_vec[vec] = (struct bio_vec){
+					.bv_page = spage,
+					.bv_offset = disk << info->fragsize_bits,
+					.bv_len = fragsize };
+				continue;
+			}
+
+			if (!(offset = (vec << info->fragsize_bits) & mask))
+				parity_page = alloc_page(GFP_NOIO);
+
+			clone->bi_io_vec[vec] = (struct bio_vec){
+				.bv_page = parity_page,
+				.bv_offset = offset,
+				.bv_len = fragsize };
+
+			if (!NOCALC && !is_read) {
+				// should do this only once per page
+				void *s = kmap_atomic(spage, KM_USER0);
+				void *p = kmap_atomic(parity_page, KM_USER1);
+				compute_parity(info, s, p + offset);
+				flush_dcache_page(parity_page);
+				kunmap_atomic(s, KM_USER0);
+				kunmap_atomic(p, KM_USER1);
+			}
+		}
+		trace_off(warn("clone %i, size = %x, vecs = %i", disk, clone->bi_size, clone->bi_vcnt);)
+		generic_make_request(clone);
+	}
+#else
+	if (!synced) {
+		trace(warn("submit degraded write"));
+		submit_bdev(bio, info->member[0]->bdev);
+		return;
+	}
+
+	atomic_set(bio_hackcount(bio), disks);
+
+	for (i = 0; i < m; i++) {
+		struct bio *clone = bio_clone(bio, GFP_NOIO);
+		clone->bi_private = bio;
+		clone->bi_end_io = clone_endio;
+		submit_bdev(bio, (info->member[i])->bdev);
+	}
+#endif
+	return err;
+}
+
+static void submit_write(struct devinfo *info, struct bio *bio, struct region *region, struct hook *hook)
+{
+	*hook = (struct hook){ .info = info, .region = region };
+	submit_rw(info, bio,
+1 ||
+!is_desynced(region), hook, clone_write_endio);
+}
+
+/* Drops and retakes region lock */
+static void restore_spare_region(struct devinfo *info)
+{
+	struct region *region;
+	spin_unlock(&info->region_lock);
+	trace(warn(""));
+	region = alloc_region();
+	spin_lock(&info->region_lock);
+	if (info->spare_region)
+		kmem_cache_free(region_cache, region);
+	else
+		info->spare_region = region;
+}
+
+static int ddraid_map(struct dm_target *target, struct bio *bio)
+{
+	struct devinfo *info = target->private;
+	unsigned sectors_per_block = info->blocksize_bits - SECTOR_SHIFT;
+	unsigned secmask = ((1 << sectors_per_block) - 1);
+	unsigned blockmask = blocksize(info) - 1;
+	unsigned sector = bio->bi_sector, is_read = bio_data_dir(bio) == READ;
+	unsigned size = bio->bi_size;
+	region_t regnum = sector >> (info->region_size_bits - SECTOR_SHIFT);
+	struct region *region;
+	struct defer *defer;
+
+	trace(warn("%s %Lx/%x, region %Lx", strio(is_read), (long long)sector, size, (long long)regnum);)
+	assert(size <= 1 << info->region_size_bits);
+
+	if (NORAID) {
+		submit_bdev(bio, info->member[0]->bdev);
+		return 0;
+	}
+
+	if ((sector & secmask) || (size & blockmask)) {
+		unsigned o = (sector << SECTOR_SHIFT) & blockmask;
+		struct bio_vec *bvec = bio->bi_io_vec;
+		struct bio *bounce;
+		struct page *pp;
+		if (((sector & secmask) + (size >> SECTOR_SHIFT)) > 1 << sectors_per_block || !is_read) {
+			warn("Long odd block %s failed", strio(is_read));
+			return -EIO;
+		}
+		warn("%s odd block, %Lx/%x", strio(is_read), (long long)sector, size);
+		pp = alloc_page(GFP_NOIO);
+		bounce = bio_alloc(GFP_NOIO, 1);
+		bounce->bi_rw = bio->bi_rw;
+		bounce->bi_sector = sector & ~secmask;
+		bounce->bi_size = blocksize(info);
+		bounce->bi_vcnt = 1;
+		bounce->bi_io_vec[0] = (struct bio_vec){ .bv_page = pp, .bv_len = PAGE_CACHE_SIZE }; // !!! PAGE_SIZE
+		bounce->bi_private = bio;
+		bounce->bi_end_io = bounce_read_endio;
+		*bio_hacklong(bio) = o;
+		if (!is_read) {
+			pagecopy(bvec->bv_page, bvec->bv_offset, pp, o, size);
+			flush_dcache_page(pp);
+		}
+		return submit_rw(info, bounce, 1, NULL, clone_read_endio);
+	}
+
+	if (NOSYNC) {
+if (is_read) {
+	if ((info->balance_acc += size) >= info->balance_den) {
+		info->balance_acc -= info->balance_den;
+		if (++info->balance == info->members)
+			info->balance = 0;
+	}
+	if (info->members == 2) {
+		submit_bdev(bio, info->member[info->balance]->bdev);
+		return 0;
+	}
+}
+		submit_rw(info, bio, 1, NULL, is_read? clone_read_endio: clone_endio);
+		return 0;
+	}
+
+	if (is_read) {
+		int synced = 0;
+
+		if (regnum < info->highwater) {
+			spin_lock(&info->region_lock);
+			region = find_region(info, regnum);
+			synced = !region || !is_desynced(region);
+			spin_unlock(&info->region_lock);
+		}
+
+		if ((info->balance_acc += size) >= info->balance_den) {
+			info->balance_acc -= info->balance_den;
+			if (++info->balance == info->members)
+				info->balance = 0;
+		}
+#ifdef DDRAID
+		if (info->members == 2) {
+			submit_bdev(bio, info->member[info->balance]->bdev);
+			return 0;
+		}
+		submit_rw(info, bio, 1, NULL, clone_read_endio);
+#else
+		submit_bdev(bio, info->member[synced? info->balance: 0]->bdev);
+#endif
+		return 0;
+	}
+
+	defer = alloc_gizmo();
+	*defer = (struct defer){ .bio = bio };
+
+	/*
+	 * This would all be a lot easier if we didn't have to worry about
+	 * holding the region lock over all the changes to the region hash
+	 * while trying to allocate new structs.
+	 *
+	 * The easy way is to allocate a region before taking the spinlock and
+	 * give it back if we find one is already there, but for most writes
+	 * this is just extra work, so instead we keep a spare region around,
+	 * and restore it later if it gets used.  Versus a mempool, this
+	 * strategy spends much less time under the spinlock.
+	 */
+	spin_lock(&info->region_lock);
+try_again:
+	if (!(region = find_region(info, regnum))) {
+		if (!info->spare_region) {
+			restore_spare_region(info);
+			goto try_again;
+		}
+		region = info->spare_region;
+		info->spare_region = NULL;
+		*region = (struct region){ .regnum = regnum };
+		insert_region(info, region);
+		goto queue_query;
+	}
+
+	/* Already have write grant?  Region will stay synced or unsynced */
+	if (region_count(region) >= 0 && !drain_region(region)) {
+		trace(warn("rewrite region %Lx, count = %i", (long long)region->regnum, region_count(region));)
+		get_region(region);
+		spin_unlock(&info->region_lock);
+		submit_write(info, bio, region, (struct hook *)defer);
+		return 0;
+	}
+
+	if (region_count(region) == -2) {
+queue_query:	set_region_count(region, -1); /* now we own it */
+		spin_unlock(&info->region_lock);
+		queue_request_lock(info, region->regnum);
+	}
+
+	list_add_tail(&defer->list, &region->wait);
+	if (!info->spare_region)
+		restore_spare_region(info);
+	spin_unlock(&info->region_lock);
+	trace(show_regions(info);)
+	return 0;
+}
+
+/*
+ * This next bit is bogus because dm already knows how to defer requests but is too
+ * messed up to allow a target to start in that state.  This goes away when dm gets
+ * a good dunging-out.
+ */
+static int ddraid_map_bogus(struct dm_target *target, struct bio *bio, union map_info *context)
+{
+	struct devinfo *info = target->private;
+	if (info->region_size_bits == -1) {
+		struct defer *defer = alloc_gizmo();
+
+		spin_lock(&info->region_lock);
+		if (info->region_size_bits != -1) {
+			spin_unlock(&info->region_lock);
+			kmem_cache_free(gizmo_cache, defer);
+			goto map;
+		}
+
+		*defer = (struct defer){ .bio = bio };
+		list_add_tail(&defer->list, &info->bogus);
+		spin_unlock(&info->region_lock);
+		return 0;
+	}
+map:
+	return ddraid_map(target, bio);
+}
+
+static void send_next_request_locked(struct devinfo *info)
+{
+	struct list_head *entry = info->requests.next;
+	struct query *query = list_entry(entry, struct query, list);
+
+	list_del(entry);
+	spin_unlock(&info->region_lock);
+	down(&info->server_out_sem);
+	outbead(info->sock, REQUEST_WRITE, struct region_message, .regnum = query->regnum);
+	up(&info->server_out_sem);
+	kmem_cache_free(gizmo_cache, query);
+	spin_lock(&info->region_lock);
+}
+
+static int worker(struct dm_target *target)
+{
+	struct devinfo *info = target->private;
+
+	daemonize("ddraid-worker");
+	down(&info->exit1_sem);
+	while (running(info)) {
+		unsigned long irqsave;
+		down(&info->more_work_sem);
+
+		/* Send write request messages */
+		spin_lock(&info->region_lock);
+		while (!list_empty(&info->requests) && !(info->flags & (FINISH_FLAG|PAUSE_FLAG)))
+			send_next_request_locked(info);
+		spin_unlock(&info->region_lock);
+
+		/* Send write release messages */
+		spin_lock_irqsave(&info->endio_lock, irqsave);
+		while (!list_empty(&info->releases) && running(info)) {
+			struct list_head *entry = info->releases.next;
+			struct retire *retire = list_entry(entry, struct retire, list);
+			struct region *region = retire->region;
+
+			list_del(entry);
+			spin_unlock_irqrestore(&info->endio_lock, irqsave);
+			if (retire->timer)
+				kfree(retire->timer); // !!! make it a cache
+			kmem_cache_free(gizmo_cache, retire);
+			spin_lock(&info->region_lock);
+			trace(warn("release region %Lx, count = %i", (long long)region->regnum, region_count(region));)
+			if (!put_region_test_zero(region)) {
+				/* More submitted before we got here */
+				spin_unlock(&info->region_lock);
+				spin_lock_irqsave(&info->endio_lock, irqsave);
+				continue;
+			}
+			release_region_unlock(info, region);
+			if (atomic_dec_and_test(&info->destroy_hold))
+				up(&info->destroy_sem);
+			spin_lock_irqsave(&info->endio_lock, irqsave);
+		}
+		spin_unlock_irqrestore(&info->endio_lock, irqsave);
+
+		trace(show_regions(info);)
+		trace(warn("Yowza! More work?");)
+	}
+	up(&info->exit1_sem); /* !!! crashes if module unloaded before ret executes */
+	warn("%s exiting", current->comm);
+	return 0;
+}
+
+static void do_defered(struct devinfo *info, struct region_message *message, int synced)
+{
+	region_t regnum = message->regnum;
+	struct region *region;
+
+	trace(warn("submit queued writes for region %Lx", (long long)regnum));
+	spin_lock(&info->region_lock);
+	region = find_region(info, regnum);
+	if (!synced && !is_desynced(region) && region->regnum < info->highwater)
+		warn("Desynced region not in cache!");
+
+	/*
+	 * Submitting a request necessarily drops the region spinlock and
+	 * the request just submitted could complete before we get the lock
+	 * again, for example, if the submit bails on an error.  To prevent
+	 * the region from disappearing, take an extra count and also handle
+	 * the possibility that it may need to be released here.
+	 */
+	set_region_count(region, 1); /* extra count */
+	if (is_desynced(region) == synced)
+		region->flags ^= DESYNC_FLAG;
+
+	while (!list_empty(&region->wait)) {
+		struct list_head *entry = region->wait.next;
+		struct defer *defer = list_entry(entry, struct defer, list);
+		trace(warn("bio sector %Lx", (long long)defer->bio->bi_sector));
+		list_del(entry);
+		get_region(region);
+		spin_unlock(&info->region_lock);
+		submit_write(info, defer->bio, region, (struct hook *)defer);
+		trace(show_regions(info);)
+		spin_lock(&info->region_lock);
+	}
+	if (put_region_test_zero(region)) /* drop extra count */
+		release_region_unlock(info, region);
+}
+
+static int incoming(struct dm_target *target)
+{
+	struct devinfo *info = target->private;
+	struct messagebuf message; // !!! have a buffer in the target->info
+	struct file *sock;
+	int err, length;
+
+	daemonize("ddraid-client");
+	down(&info->exit2_sem);
+connect:
+	trace(warn("Request socket connection");)
+	outbead(info->control_socket, NEED_SERVER, struct { });
+	trace(warn("Wait for socket connection");)
+	down(&info->server_in_sem);
+	trace(warn("got socket %p", info->sock);)
+	sock = info->sock;
+
+	while (running(info)) { // stop on module exit
+		trace(warn("wait message");)
+		if ((err = readpipe(sock, &message.head, sizeof(message.head))))
+			goto socket_error;
+		length = message.head.length;
+		if (length > maxbody)
+			goto message_too_long;
+		trace(warn("%x/%u", message.head.code, length);)
+		if ((err = readpipe(sock, &message.body, length)))
+			goto socket_error;
+	
+		switch (message.head.code) {
+		case REPLY_IDENTIFY:
+		{
+			struct reply_identify *body = (struct reply_identify *)&message.body;
+			trace(warn("identify succeeded, region bits = %i", body->region_bits);)
+			spin_lock(&info->region_lock);
+			info->region_size_bits = body->region_bits;
+//			target->split_io = 1 << info->region_size_bits;
+			while (!list_empty(&info->bogus)) {
+				struct list_head *entry = info->bogus.next;
+				struct defer *defer = list_entry(entry, struct defer, list);
+				list_del(entry);
+				spin_unlock(&info->region_lock);
+				ddraid_map(target, defer->bio);
+				kmem_cache_free(gizmo_cache, defer);
+				spin_lock(&info->region_lock);
+			}
+			spin_unlock(&info->region_lock);
+
+			up(&info->server_out_sem);
+			outbead(info->control_socket, REPLY_CONNECT_SERVER, struct { });
+			continue;
+		}
+
+		case GRANT_SYNCED:
+			trace(warn("granted synced");)
+			do_defered(info, (void *)&message.body, 1);
+			break;
+			
+		case GRANT_UNSYNCED:
+			trace(warn("granted unsynced");)
+			do_defered(info, (void *)&message.body, 0);
+			break;
+
+		// On failover, the new server may have found some new unsynced regions
+		// (because a client failed to reconnect) or it might have synced some
+		// regions before we reconnected (assuming it was able to get hold of a
+		// definitive list of which clients held those regions dirty) and we
+		// missed the desync delete broadcast.
+		
+		// If we hold a write grant for a desynced region, the server can't
+		// have synced it (because it hasn't seen us yet, a former writer).
+		// So we can go ahead and keep writing to it.  If the server sends
+		// us a new desync for the region then it's confused and we need to
+		// warn.
+		
+		// If we hold a write grant for a synced region, it's ok to do synced
+		// IO even if the region is now unsynced, because the server must not
+		// resync the region before all writers go away, so there is no chance
+		// for our multi-disk IO to interleave with the server's resync IO.
+		// So the region state may suddenly change from synced to desynced,
+		// which is fine: further writes will be submitted degraded, and the
+		// synced writes in progress won't do any harm.
+		
+		// As usual, the server always gives us up to date state for any write
+		// request.  We upload our list of dirty regions so the server knows
+		// we can still write to them and isn't surprised when it sees release
+		// messages for those regions.
+
+		// As usual, degraded reads are always safe, just possibly suboptimal.
+		// So we only have to worry about balanced reads.  If a client died
+		// while writing a synced region, it's up to the cluster filesystem to
+		// ensure it disregards reads from those blocks.  But the server must
+		// resync the region at some point, so we do need to have some way to
+		// drain any balanced reads in the pipeline.  Damn, it means balanced
+		// reads have to be hooked, and hooks have to be alloced.  At least
+		// reads can still be handled entirely in the submitter's context.
+
+		case ADD_UNSYNCED:
+		{
+			region_t regnum = ((struct region_message *)&message.body)->regnum;
+			struct region *region;
+
+			trace(warn("add unsynced region %Lx", (long long)regnum));
+			spin_lock(&info->region_lock);
+try_again:		if (!(region = find_region(info, regnum))) {
+				if (!info->spare_region) {
+					restore_spare_region(info);
+					goto try_again;
+				}
+				region = info->spare_region;
+				info->spare_region = NULL;
+				*region = (struct region){ .flags = DESYNC_FLAG, .count = { -2 }, .regnum = regnum };
+				insert_region(info, region);
+			} else
+				region->flags |= DESYNC_FLAG;
+			spin_unlock(&info->region_lock);
+			break;
+		}
+
+		case DEL_UNSYNCED:
+		{
+			region_t regnum = ((struct region_message *)&message.body)->regnum;
+			struct region *region;
+
+			trace(warn("del unsynced region %Lx", (long long)regnum));
+			spin_lock(&info->region_lock);
+			if (!(region = find_region(info, regnum)))
+				warn("Deleted uncached unsynced region %Lx", (long long)regnum);
+			else {
+				region->flags &= ~DESYNC_FLAG;
+				if (region_count(region) == -2) {
+					free_region_unlock(info, region);
+					break;
+				}
+			}
+			spin_unlock(&info->region_lock);
+			break;
+		}
+
+		case SET_HIGHWATER:
+			info->highwater = ((struct region_message *)&message.body)->regnum;
+			trace(warn("Set highwater %Lx", (long long)info->highwater));
+			break;
+
+		case DRAIN_REGION:
+		{
+			region_t regnum = ((struct region_message *)&message.body)->regnum;
+			struct region *region;
+
+			trace(warn("drain region %Lx", (long long)regnum));
+			spin_lock(&info->region_lock);
+			if ((region = find_region(info, regnum)) && (region_count(region) >= 0))
+				region->flags |= DRAIN_FLAG;
+			spin_unlock(&info->region_lock);
+			break;
+		}
+
+		case PAUSE_REQUESTS:
+			info->flags |= PAUSE_FLAG;
+			break;
+
+		case RESUME_REQUESTS:
+			spin_lock(&info->region_lock);
+			info->flags &= ~PAUSE_FLAG;
+			while (!list_empty(&info->requests))
+				send_next_request_locked(info);
+			spin_unlock(&info->region_lock);
+			break;
+
+		case BOUNCE_REQUEST:
+			queue_request(info, ((struct region_message *)&message.body)->regnum);
+			break;
+
+		default: 
+			warn("Unknown message %x", message.head.code);
+			continue;
+		}
+	}
+out:
+	up(&info->exit2_sem); /* !!! will crash if module unloaded before ret executes */
+	warn("%s exiting", current->comm);
+	return 0;
+message_too_long:
+	warn("message %x too long (%u bytes)", message.head.code, message.head.length);
+	goto out;
+socket_error:
+	warn("socket error %i", err);
+	if (running(info))
+		goto connect;
+	goto out;
+}
+
+static int control(struct dm_target *target)
+{
+	struct devinfo *info = target->private;
+	struct messagebuf message; // !!! have a buffer in the target->info
+	struct file *sock;
+	int err, length;
+
+	daemonize("ddraid-control");
+	sock = info->control_socket;
+	trace(warn("got socket %p", sock);)
+
+	down(&info->exit3_sem);
+	while (running(info)) {
+		trace(warn("wait message");)
+		if ((err = readpipe(sock, &message.head, sizeof(message.head))))
+			goto socket_error;
+		trace(warn("got message header code %x", message.head.code);)
+		length = message.head.length;
+		if (length > maxbody)
+			goto message_too_long;
+		trace(warn("%x/%u", message.head.code, length);)
+		if ((err = readpipe(sock, &message.body, length)))
+			goto socket_error;
+	
+		switch (message.head.code) {
+		case CONNECT_SERVER: {
+			unsigned len = 4;
+			char bogus[len];
+			int sock_fd = get_unused_fd(), fd;
+
+			trace(warn("Received connect server");)
+			if (sock_fd < 0) {
+				warn("Can't get fd, error %i", sock_fd);
+				break;
+			}
+			fd_install(sock_fd, sock);
+			if ((fd = recv_fd(sock_fd, bogus, &len)) < 0) {
+				warn("recv_fd failed, error %i", fd);
+				put_unused_fd(sock_fd);
+				break;
+			}
+			trace(warn("Received socket %i", fd);)
+			info->sock = fget(fd);
+			current->files->fd[fd] = NULL; /* this is sooo hokey */
+			put_unused_fd(sock_fd);
+			sys_close(fd);
+			up(&info->server_in_sem);
+			outbead(info->sock, IDENTIFY, struct identify, .id = 6);
+			break;
+		}
+		default: 
+			warn("Unknown message %x", message.head.code);
+			continue;
+		}
+	}
+out:
+	up(&info->exit3_sem); /* !!! will crash if module unloaded before ret executes */
+	warn("%s exiting", current->comm);
+	return 0;
+message_too_long:
+	warn("message %x too long (%u bytes)", message.head.code, message.head.length);
+	goto out;
+socket_error:
+	warn("socket error %i", err);
+	goto out;
+}
+
+static int get_control_socket(char *sockname)
+{
+	mm_segment_t oldseg = get_fs();
+	struct sockaddr_un addr = { .sun_family = AF_UNIX };
+	int addr_len = sizeof(addr) - sizeof(addr.sun_path) + strlen(sockname); // !!! check too long
+	int sock = sys_socket(AF_UNIX, SOCK_STREAM, 0), err = 0;
+
+	trace(warn("Connect to control socket %s", sockname);)
+	if (sock <= 0)
+		return sock;
+	strncpy(addr.sun_path, sockname, sizeof(addr.sun_path));
+	if (sockname[0] == '@')
+		addr.sun_path[0] = 0;
+
+	set_fs(get_ds());
+	err = sys_connect(sock, (struct sockaddr *)&addr, addr_len);
+	set_fs(oldseg);
+	return err? err: sock;
+}
+
+static int shutdown_socket(struct file *socket)
+{
+	struct socket *sock = SOCKET_I(socket->f_dentry->d_inode);
+	return sock->ops->shutdown(sock, RCV_SHUTDOWN);
+}
+
+static int ddraid_status(struct dm_target *target, status_type_t type, char *result, unsigned maxlen)
+{
+	switch (type) {
+	case STATUSTYPE_INFO:
+	case STATUSTYPE_TABLE:
+		result[0] = '\0';
+		break;
+	}
+
+	return 0;
+}
+
+static void ddraid_destroy(struct dm_target *target)
+{
+	struct devinfo *info = target->private;
+	int i, err;
+
+	trace(warn("%p", target);)
+	if (!info)
+		return;
+
+	down(&info->destroy_sem);
+
+	/* Unblock helper threads */
+	info->flags |= FINISH_FLAG;
+	up(&info->server_in_sem); // unblock incoming thread
+	up(&info->server_out_sem); // unblock io request threads
+	up(&info->more_work_sem);
+
+	if (info->sock && (err = shutdown_socket(info->sock)))
+		warn("server socket shutdown error %i", err);
+	if (info->sock && (err = shutdown_socket(info->control_socket)))
+		warn("control socket shutdown error %i", err);
+
+	// !!! wrong! the thread might be just starting, think about this some more
+	// ah, don't let ddraid_destroy run while ddraid_create is spawning threads
+	down(&info->exit1_sem);
+	warn("thread 1 exited");
+	down(&info->exit2_sem);
+	warn("thread 2 exited");
+	down(&info->exit3_sem);
+	warn("thread 3 exited");
+
+	if (info->spare_region)
+		kmem_cache_free(region_cache, info->spare_region);
+	if (info->sock)
+		fput(info->sock);
+	for (i = 0; i < info->members; i++)
+		if (info->member[i])
+			dm_put_device(target, info->member[i]);
+	kfree(info);
+}
+
+static int ddraid_create(struct dm_target *target, unsigned argc, char **argv)
+{
+	struct devinfo *info;
+	sector_t member_len;
+	char *end;
+	int err, i, members = simple_strtoul(argv[0], &end, 10);
+	char *error;
+
+	err = -ENOMEM;
+	error = "Can't get kernel memory";
+	if (!(info = kmalloc(sizeof(struct devinfo), GFP_KERNEL)))
+		goto eek;
+
+	err = -EINVAL;
+	error = "ddraid usage: members device... sockname";
+	if (members > MAX_MEMBERS || members > argc - 2)
+		goto eek;
+
+	error = "dm-stripe: Target length not divisable by number of members";
+	member_len = target->len;
+	*info = (struct devinfo){ .members = members, .region_size_bits = -1, .dead = -1 };
+#ifdef DDRAID
+	{
+	int n = members - 1, k = fls(n) - 1;
+
+	info->balance_den = 1 << 21;
+
+	if (sector_div(member_len, members - 1)) /* modifies arg1! */
+		goto eek;
+
+//	member_len += n;
+//	sector_div(member_len, members - 1); /* modifies arg1! */
+
+	error = "Invalid number of ddraid members (must be 2**k+1)";
+	if (members < 2 || (~(-1 << k) & n))
+		goto eek;
+
+	error = "Drive out of range";
+	if (info->dead >= members)
+		goto eek;
+
+	warn("Order %i ddraid", k);
+	info->blocksize_bits = PAGE_CACHE_SHIFT; // just for now
+	info->fragsize_bits = info->blocksize_bits - k;
+	}
+#endif
+	target->private = info;
+	sema_init(&info->destroy_sem, 1);
+	sema_init(&info->server_in_sem, 0);
+	sema_init(&info->server_out_sem, 0);
+	sema_init(&info->exit1_sem, 1);
+	sema_init(&info->exit2_sem, 1);
+	sema_init(&info->exit3_sem, 1);
+	sema_init(&info->more_work_sem, 0);
+	spin_lock_init(&info->region_lock);
+	spin_lock_init(&info->endio_lock);
+	INIT_LIST_HEAD(&info->requests);
+	INIT_LIST_HEAD(&info->releases);
+	INIT_LIST_HEAD(&info->bogus);
+	for (i = 0; i < HASH_BUCKETS; i++)
+		INIT_LIST_HEAD(&info->hash[i]);
+
+	error = "Can't connect control socket";
+	if ((err = get_control_socket(argv[argc - 1])) < 0)
+		goto eek;
+	info->control_socket = fget(err);
+	sys_close(err);
+
+	error = "Can't open ddraid member";
+	for (i = 0; i < members; i++)
+		if ((err = dm_get_device(target, argv[i + 1], 0, member_len,
+			dm_table_get_mode(target->table), &info->member[i])))
+			goto eek;
+
+	error = "Can't start daemon";
+	if ((err = kernel_thread((void *)incoming, target, CLONE_KERNEL)) < 0)
+		goto eek;
+	if ((err = kernel_thread((void *)worker, target, CLONE_KERNEL)) < 0)
+		goto eek;
+	if ((err = kernel_thread((void *)control, target, CLONE_KERNEL)) < 0)
+		goto eek;
+
+	warn("Created cluster raid device");
+//	target->split_io = 1 << MIN_REGION_BITS; /* goes away if we can start suspended */
+	return 0;
+
+eek:	warn("Device create error %i: %s!", err, error);
+	ddraid_destroy(target);
+	target->error = error;
+	return err;
+}
+
+static struct target_type ddraid = {
+	.name = "ddraid",
+	.version = {0, 0, 0},
+	.module = THIS_MODULE,
+	.ctr = ddraid_create,
+	.dtr = ddraid_destroy,
+	.map = ddraid_map_bogus,
+	.status = ddraid_status,
+};
+
+int __init dm_ddraid_init(void)
+{
+	int err;
+	char *what = "Mirror register";
+
+	if ((err = dm_register_target(&ddraid)))
+		goto bad1;
+	err = -ENOMEM;
+	what = "Cache create";
+	if (!(region_cache = kmem_cache_create("ddraid-region",
+		sizeof(struct region), __alignof__(struct region), 0, NULL, NULL)))
+		goto bad2;
+	if (!(gizmo_cache = kmem_cache_create("ddraid-gizmos",
+		sizeof(union gizmo), __alignof__(union gizmo), 0, NULL, NULL)))
+		goto bad3;
+	return 0;
+bad3:
+	kmem_cache_destroy(region_cache);
+bad2:
+	dm_unregister_target(&ddraid);
+bad1:
+	DMERR("%s failed\n", what);
+	return err;
+}
+
+void dm_ddraid_exit(void)
+{
+	int err;
+	if ((err = dm_unregister_target(&ddraid)))
+		DMERR("Unregister failed %d", err);
+	if (region_cache)
+		kmem_cache_destroy(region_cache);
+	if (gizmo_cache)
+		kmem_cache_destroy(gizmo_cache);
+}
+
+module_init(dm_ddraid_init);
+module_exit(dm_ddraid_exit);
diff -up --recursive 2.6.11.3.clean/drivers/md/dm-ddraid.h 2.6.11.3/drivers/md/dm-ddraid.h
--- 2.6.11.3.clean/drivers/md/dm-ddraid.h	2005-03-30 00:59:56.000000000 -0500
+++ 2.6.11.3/drivers/md/dm-ddraid.h	2005-04-14 15:36:16.000000000 -0400
@@ -0,0 +1,67 @@
+#define PACKED __attribute__ ((packed))
+
+struct head
+{
+	uint32_t code;
+	uint32_t length;
+};
+
+enum {
+	REPLY_ERROR = 0xbead0000,
+	NEED_SERVER,
+	CONNECT_SERVER,
+	REPLY_CONNECT_SERVER,
+	SERVER_READY,
+	START_SERVER,
+	SHUTDOWN_SERVER,
+	CONTROL_SOCKET,
+	IDENTIFY,
+	REPLY_IDENTIFY,
+	REQUEST_WRITE,
+	RELEASE_WRITE,
+	GRANT_SYNCED,
+	GRANT_UNSYNCED,
+	ADD_UNSYNCED,
+	DEL_UNSYNCED,
+	DRAIN_REGION,
+	SET_HIGHWATER,
+	SYNC_REGION,
+	REGION_SYNCED,
+	PAUSE_REQUESTS,
+	RESUME_REQUESTS,
+	BOUNCE_REQUEST,
+};
+
+typedef unsigned long region_t;
+
+struct identify { uint32_t id; } PACKED;
+struct region_message { region_t regnum; } PACKED;
+struct reply_identify { unsigned region_bits; } PACKED;
+
+/* decruft me... !!! */
+#define maxbody 500
+struct messagebuf { struct head head; char body[maxbody]; };
+/* ...decruft me */
+
+// bios submitted before server arrives must be split conservatively (see "bogus")
+#define MIN_REGION_BITS 12
+
+/* The endian conversions that libc forgot */
+
+static inline uint64_t ntohll(uint64_t n)
+{
+#if __BYTE_ORDER == __LITTLE_ENDIAN
+	return (((uint64_t)ntohl(n)) << 32) | ntohl(n >> 32);
+#else
+	return n; 
+#endif
+}
+
+static inline uint64_t htonll(uint64_t n)
+{
+#if __BYTE_ORDER == __LITTLE_ENDIAN
+	return (((uint64_t)htonl(n)) << 32) | htonl(n >> 32);
+#else
+	return n; 
+#endif
+}
diff -up --recursive 2.6.11.3.clean/drivers/md/dm-ddsnap.c 2.6.11.3/drivers/md/dm-ddsnap.c
--- 2.6.11.3.clean/drivers/md/dm-ddsnap.c	2005-05-31 20:21:07.000000000 -0400
+++ 2.6.11.3/drivers/md/dm-ddsnap.c	2005-05-31 21:50:09.000000000 -0400
@@ -0,0 +1,1144 @@
+#include <linux/fs.h>
+#include <linux/slab.h>
+#include <linux/mm.h>
+#include <linux/module.h>
+#include <linux/pagemap.h>
+#include <linux/file.h>
+#include <linux/syscalls.h> // recvmsg
+#include <linux/socket.h>
+#include <linux/un.h>
+#include <net/sock.h>
+#include <asm/uaccess.h>
+#include <asm/bug.h>
+#include <linux/bio.h>
+#include "dm.h"
+#include "dm-ddsnap.h"
+
+#define warn(string, args...) do { printk("%s: " string "\n", __func__, ##args); } while (0)
+#define error(string, args...) do { warn(string, ##args); BUG(); } while (0)
+#define assert(expr) do { if (!(expr)) error("Assertion " #expr " failed!\n"); } while (0)
+#define trace_on(args) args
+#define trace_off(args)
+
+#define trace trace_off
+
+/*
+ * To do:
+ *
+ * - variable length bio handling
+ * - unique cache
+ * - receive chunk size
+ * - make pending and hook a union
+ * - get rid of multiple ranges per message misfeature
+ * - rationalize sector vs chunk usage in messages
+ * - detect message id wrap
+ * - detect message timeout
+ */
+
+/* Useful gizmos */
+
+static int rwpipe(struct file *file, const void *buffer, unsigned int count,
+	ssize_t (*op)(struct kiocb *, const char *, size_t, loff_t), int mode)
+{
+	struct kiocb iocb;
+	mm_segment_t oldseg;
+	int err = 0;
+	trace_off(warn("%s %i bytes", mode == FMODE_READ? "read": "write", count);)
+	if (!(file->f_mode & mode))
+		return -EBADF;
+	if (!op)
+		return -EINVAL;
+	init_sync_kiocb(&iocb, file); // new in 2.5 (hmm)
+	iocb.ki_pos = file->f_pos;
+	oldseg = get_fs();
+	set_fs(get_ds());
+	while (count) {
+		int chunk = (*op)(&iocb, buffer, count, iocb.ki_pos);
+		if (chunk <= 0) {
+			err = chunk? chunk: -EPIPE;
+			break;
+		}
+		BUG_ON(chunk > count);
+		count -= chunk;
+		buffer += chunk;
+	}
+	set_fs(oldseg);
+	file->f_pos = iocb.ki_pos;
+	return err;
+}
+
+static inline int readpipe(struct file *file, void *buffer, unsigned int count)
+{
+	return rwpipe(file, buffer, count, (void *)file->f_op->aio_read, FMODE_READ);
+}
+
+static inline int writepipe(struct file *file, void *buffer, unsigned int count)
+{
+	return rwpipe(file, buffer, count, file->f_op->aio_write, FMODE_WRITE);
+}
+
+#define outbead(SOCK, CODE, STRUCT, VALUES...) ({ \
+	struct { struct head head; STRUCT body; } PACKED message = \
+		{ { CODE, sizeof(STRUCT) }, { VALUES } }; \
+	writepipe(SOCK, &message, sizeof(message)); })
+
+/*
+ * This gets the job done but it sucks as an internal interface: there
+ * is no reason to deal with fds at all, we just want to receive the
+ * (struct file *), we do not want to have to wrap the socket in a
+ * fd just to call recv_fd, and user space pointer for the (bogus) data
+ * payload is just silly.  Never mind the danger of triggering some
+ * wierdo signal handling cruft deep in the socket layer.  This kind of
+ * posturing - lathering layers of cruft upon cruft - is the stuff
+ * Windows is made of, Linux is not supposed to be like that.  Fixing
+ * this requires delving into the SCM_RIGHTS path deep inside sys_recvmsg
+ * and breaking out the part that actually does the work, to be a usable
+ * internal interface.  Put it on the list of things to do.
+ */
+static int recv_fd(int sock, char *bogus, unsigned *len)
+{
+	char payload[CMSG_SPACE(sizeof(int))];
+	struct msghdr msg = {
+		.msg_control = payload,
+		.msg_controllen = sizeof(payload),
+		.msg_iov = &(struct iovec){ .iov_base = bogus, .iov_len = *len },
+		.msg_iovlen = 1,
+	};
+	mm_segment_t oldseg = get_fs();
+	struct cmsghdr *cmsg;
+	int result;
+
+	set_fs(get_ds());
+	result = sys_recvmsg(sock, &msg, 0);
+	set_fs(oldseg);
+
+	if (result <= 0)
+		return result;
+	if (!(cmsg = CMSG_FIRSTHDR(&msg)))
+		return -ENODATA;
+	if (cmsg->cmsg_len != CMSG_LEN(sizeof(int)) ||
+		cmsg->cmsg_level != SOL_SOCKET ||
+		cmsg->cmsg_type != SCM_RIGHTS)
+		return -EBADMSG;
+
+	*len = result;
+	return *((int *)CMSG_DATA(cmsg));
+}
+
+static void kick(struct block_device *dev)
+{
+	request_queue_t *q = bdev_get_queue(dev);
+	if (q->unplug_fn)
+		q->unplug_fn(q);
+}
+
+/* ...Useful gizmos */
+
+typedef u64 chunk_t;
+
+#define SECTOR_SHIFT 9
+#define IS_SNAP_FLAG (1 << 0)
+#define REPORT_BIT 1
+#define RECOVER_FLAG (1 << 2)
+#define FINISH_FLAG (1 << 3)
+#define NUM_BUCKETS 64
+#define MASK_BUCKETS (NUM_BUCKETS - 1)
+#define ID_BITS 16
+
+struct devinfo {
+	u64 id;
+	unsigned long flags;
+	unsigned chunksize_bits;
+	unsigned chunkshift;
+//	sector_t len;
+	int snap, nextid;
+	u32 *shared_bitmap; // !!! get rid of this, use the inode cache
+	struct inode  *inode; /* the cache */
+	struct dm_dev *orgdev;
+	struct dm_dev *snapdev;
+	struct file *sock;
+	struct file *control_socket;
+	struct semaphore server_in_sem;
+	struct semaphore server_out_sem;
+	struct semaphore more_work_sem;
+	struct semaphore recover_sem;
+	struct semaphore exit1_sem;
+	struct semaphore exit2_sem;
+	struct semaphore exit3_sem;
+	struct list_head pending[NUM_BUCKETS];
+	struct list_head queries;
+	struct list_head releases;
+	struct list_head locked;
+	spinlock_t pending_lock;
+	spinlock_t end_io_lock;
+	int dont_switch_lists;
+};
+
+static inline int is_snapshot(struct devinfo *info)
+{
+	return !!(info->flags & IS_SNAP_FLAG);
+}
+
+static inline int running(struct devinfo *info)
+{
+	return !(info->flags & FINISH_FLAG);
+}
+
+static inline int worker_running(struct devinfo *info)
+{
+        return !(info->flags & (FINISH_FLAG|RECOVER_FLAG));
+}
+
+static void report_error(struct devinfo *info)
+{
+	if (test_and_set_bit(REPORT_BIT, &info->flags))
+		return;
+	up(&info->more_work_sem);
+	down(&info->recover_sem);
+	info->flags |= RECOVER_FLAG;
+}
+
+/* Static caches, shared by all ddsnap instances */
+
+static kmem_cache_t *pending_cache;
+static kmem_cache_t *end_io_cache;
+static struct super_block *snapshot_super;
+
+/* We cache query results because we are greedy about speed */
+
+#ifdef CACHE
+static u64 *snap_map_cachep(struct address_space *mapping, chunk_t chunk, struct page **p)
+{
+	u32 page_index;
+	u32 page_pos;
+	struct page *page;
+	u64 *exceptions;
+
+	page_index = chunk / (PAGE_SIZE / sizeof(u64));
+	page_pos = chunk % (PAGE_SIZE / sizeof(u64));
+
+	page = find_or_create_page(mapping, page_index, GFP_KERNEL);
+	if (page) {
+		/* Clean page if it's a new one */
+		if (!Page_Uptodate(page)) {
+			memset(page_address(page), 0, PAGE_SIZE);
+			SetPageUptodate(page);
+		}
+
+		exceptions = page_address(page);
+		*p = page;
+		return &exceptions[page_pos];
+	}
+	return NULL;
+}
+
+static inline int get_unshared_bit(struct devinfo *info, chunk_t chunk)
+{
+	return (info->shared_bitmap[chunk >> 5] >> (chunk & 31)) & 1;
+}
+
+static inline void set_unshared_bit(struct devinfo *info, chunk_t chunk)
+{
+	info->shared_bitmap[chunk >> 5] |= 1 << (chunk & 31);
+}
+#endif
+
+/* Hash table matches up query replies to pending requests */
+
+struct pending {
+	unsigned id;
+	u64 chunk;
+	unsigned chunks;
+	struct bio *bio;
+	struct list_head list;
+};
+
+static void show_pending(struct devinfo *info)
+{
+	unsigned i, total = 0;
+
+	spin_lock(&info->pending_lock);
+	warn("Pending server queries...");
+	for (i = 0; i < NUM_BUCKETS; i++) {
+		struct list_head *list;
+		list_for_each(list, info->pending + i) {
+			struct pending *pending = list_entry(list, struct pending, list);
+			if (!total)
+				printk("[%u]: ", i);
+			printk("%u:%Lx ", pending->id, pending->chunk);
+			total++;
+		}
+	}
+	printk("(%u)\n", total);
+	if (!list_empty(&info->queries)) {
+		struct list_head *list;
+		total = 0;
+		warn("Queued queries...");
+		list_for_each(list, &info->queries) {
+			struct pending *pending = list_entry(list, struct pending, list);
+			printk("%Lx ", pending->chunk);
+			total++;
+		}
+		printk("(%u)\n", total);
+	}
+	spin_unlock(&info->pending_lock);
+}
+
+static inline unsigned hash_pending(unsigned id)
+{
+	return id & MASK_BUCKETS;
+}
+
+/* Ah, now it gets interesting.  Called in interrupt context */
+
+struct hook {
+	struct devinfo *info;
+	sector_t sector;
+	/* needed only for end_io, make it a union */
+	bio_end_io_t *old_end_io;
+	void *old_private;
+	/* needed after end_io, for release, make it a union */
+	struct list_head list;
+};
+
+static int snapshot_read_end_io(struct bio *bio, unsigned int done, int error)
+{
+	struct hook *hook = bio->bi_private;
+	struct devinfo *info = hook->info;
+
+	trace(warn("sector %Lx", (long long)hook->sector);)
+	spin_lock(&info->end_io_lock);
+	bio->bi_end_io = hook->old_end_io;
+	bio->bi_private = hook->old_private;
+	hook->old_end_io = NULL;
+	if (info->dont_switch_lists == 0)
+		list_move(&hook->list, &info->releases);
+	spin_unlock(&info->end_io_lock);
+	up(&info->more_work_sem);
+
+	return bio->bi_end_io(bio, done, error);
+}
+
+/* This is the part that does all the work. */
+
+int replied_rw(struct dm_target *target, struct rw_request *body, unsigned length, int rw, int snap)
+{
+	struct devinfo *info = target->private;
+	struct chunk_range *p = body->ranges;
+	unsigned shift = info->chunksize_bits - SECTOR_SHIFT, mask = (1 << shift) - 1;
+	int i, j, submitted = 0;
+
+	trace(show_pending(info);)
+	trace(warn("id = %u, %u ranges, %s %s", body->id, body->count,
+		rw == READ? "read from": "write to", snap? "snapshot": "origin");)
+
+	for (i = 0; i < body->count; i++) { // !!! check for length overrun
+		unsigned chunks = p->chunks, id = body->id;
+		struct list_head *list, *bucket = info->pending + hash_pending(id);
+		struct pending *pending;
+		struct bio *bio;
+
+		trace(warn("[%Lx/%x]", p->chunk, chunks);)
+		assert(chunks == 1);
+
+		spin_lock(&info->pending_lock);
+		list_for_each(list, bucket)
+			if ((pending = list_entry(list, struct pending, list))->id == id)
+				goto found;
+		warn("Can't find pending rw for chunk %u:%Lx", id, p->chunk);
+		spin_unlock(&info->pending_lock);
+		return -1;
+found:
+		list_del(&pending->list);
+		spin_unlock(&info->pending_lock);
+
+		bio = pending->bio;
+		trace(warn("Handle pending IO sector %Lx", (long long)bio->bi_sector);)
+
+		if (chunks != pending->chunks) {
+			warn("Message mismatch, expected %x got %x", chunks, chunks);
+			kmem_cache_free(pending_cache, pending);
+			bio_io_error(bio, bio->bi_size);
+			return -1;
+		}
+
+		++p;
+		if (snap) {
+			chunk_t *p2 = (chunk_t *)p;
+			for (j = 0; j < chunks; j++) {
+				u64 physical = (*p2++ << shift) + (bio->bi_sector & mask);
+				trace(warn("logical %Lx = physical %Lx", (u64)bio->bi_sector, physical));
+				bio->bi_bdev = info->snapdev->bdev;
+				bio->bi_sector = physical;
+			}
+			p = (struct chunk_range *)p2;
+		} else if (rw == READ) {
+			/* snapshot read from origin */
+			struct hook *hook;
+			trace(warn("hook end_io for %Lx", (long long)bio->bi_sector));
+			hook = kmem_cache_alloc(end_io_cache, GFP_KERNEL|__GFP_NOFAIL); // !!! union with pending
+			*hook = (struct hook){
+				.info = info,
+				.sector = bio->bi_sector,
+				.old_end_io = bio->bi_end_io,
+				.old_private = bio->bi_private };
+			bio->bi_end_io = snapshot_read_end_io;
+			bio->bi_private = hook;
+			list_add(&hook->list, &info->locked);
+		}
+
+		generic_make_request(bio);
+		submitted++;
+#ifdef CACHE
+		for (j = 0; j < p->chunks; j++)
+			set_unshared_bit(info, chunk + j);
+#endif
+		kmem_cache_free(pending_cache, pending);
+	}
+	if (submitted){
+		kick(info->orgdev->bdev);
+		kick(info->snapdev->bdev);
+	}
+	return 0;
+}
+
+/*
+ * There happen to be four flavors of server replies to rw queries, two
+ * write and two read, but the symmetry ends there.  Only one flavor
+ * (write) is for origin IO, because origin reads do not need global
+ * synchronization.  The remaining three flavors are for snapshot IO.
+ * Snapshot writes are always to the snapshot store, so there is only
+ * one flavor.  On the other hand, snapshot reads can be from either
+ * the origin or the snapshot store.  Only the server can know which.
+ * Either or both kinds of snapshot read reply are possible for a given
+ * query, which is where things get nasty.  These two kinds of replies
+ * can be interleaved arbitrarily along the original read request, and
+ * to just to add a little more spice, the server may not send back the
+ * results for an entire query in one message (it may decide to service
+ * other queries first, or replly about the 'easiest' chunks first). The
+ * client has to match up all these reply fragments to the original
+ * request and decide what to do.  Such bizarre fragmentation of the
+ * incoming request is unavoidable, it results from write access
+ * patterns to the origin.  We just have to grin and deal with it.  So
+ * without further ado, here is how the various reply flavors
+ *
+ * - Origin write replies just have logical ranges, since origin physical 
+ *   address is the same as logical.
+ *
+ * - Snapshot read replies come back in two separate messages, one for
+ *   the origin reads (if any) and one for the snapstore reads (if any),
+ *   the latter includes snapstore addresses.  Origin reads are globally
+ *   locked by the server, so we must send release messages on
+ *   completion.
+ *
+ * - Snapshot writes are always to the snapstore, so snapstore write
+ *   replies always include snapstore addresses.
+ *
+ * We know whether we're supposed to be a snapshot or origin client,
+ * but we only use that knowledge as a sanity check.  The message codes
+ * tell us explicitly whether the IO target is origin or snapstore.
+ */
+
+/*
+ * For now, we just block on incoming message traffic, so this daemon
+ * can't do any other useful work.  It could if we used nonblocking pipe
+ * IO but we have been too lazy to implement it so far.  So we have one
+ * more daemon than we really need, and maybe we will get energetic one
+ * day soon and get rid of it.
+ *
+ * When it comes time to destroy things, the daemon has to be kicked
+ * out of its blocking wait, if it is in one, which it probably is.  We
+ * do that by shutting down the socket.  This unblocks the waiters and
+ * feeds them errors.  Does this work for all flavors of sockets?  I
+ * don't know.  It obviously should, but we've seen some pretty silly
+ * limitations in our long life, so nothing would surprise us at this
+ * point.
+ */
+static int incoming(struct dm_target *target)
+{
+	struct devinfo *info = target->private;
+	struct messagebuf message; // !!! have a buffer in the target->info
+	struct file *sock;
+	struct task_struct *task = current;
+	int err, length;
+
+	strcpy(task->comm, "ddsnap-client");
+	down(&info->exit2_sem);
+	trace(warn("Client thread started, pid=%i", current->pid);)
+connect:
+	trace(warn("Request socket connection");)
+	outbead(info->control_socket, NEED_SERVER, struct { });
+	trace(warn("Wait for socket connection");)
+	down(&info->server_in_sem);
+	trace(warn("got socket %p", info->sock);)
+	sock = info->sock;
+
+	while (running(info)) { // stop on module exit
+		int rw, to_snap;
+
+		trace(warn("wait message");)
+		if ((err = readpipe(sock, &message.head, sizeof(message.head))))
+			goto socket_error;
+		length = message.head.length;
+		if (length > maxbody)
+			goto message_too_long;
+		trace(warn("%x/%u", message.head.code, length);)
+		if ((err = readpipe(sock, &message.body, length)))
+			goto socket_error;
+	
+		switch (message.head.code) {
+		case REPLY_ORIGIN_WRITE:
+			rw = WRITE;
+			to_snap = 0;
+			break;
+
+		case REPLY_SNAPSHOT_WRITE:
+			rw = WRITE;
+			to_snap = 1;
+			break;
+
+		case REPLY_SNAPSHOT_READ_ORIGIN:
+			rw = READ;
+			to_snap = 0;
+			break;
+
+		case REPLY_SNAPSHOT_READ:
+			rw = READ;
+			to_snap = 1;
+			break;
+
+		case REPLY_IDENTIFY:
+			trace(warn("identify succeeded");)
+			up(&info->server_out_sem);
+			outbead(info->control_socket, REPLY_CONNECT_SERVER, struct { });
+			continue;
+
+		default: 
+			warn("Unknown message %x", message.head.code);
+			continue;
+		}
+		if (length < sizeof(struct rw_request))
+			goto message_too_short;
+
+		replied_rw(target, (void *)message.body, length, rw, to_snap);
+	}
+out:
+	up(&info->exit2_sem); /* !!! will crash if module unloaded before ret executes */
+	warn("%s exiting", task->comm);
+	return 0;
+message_too_long:
+	warn("message %x too long (%u bytes)", message.head.code, message.head.length);
+	goto out;
+message_too_short:
+	warn("message %x too short (%u bytes)", message.head.code, message.head.length);
+	goto out;
+socket_error:
+	warn("socket error %i", err);
+	if (!running(info))
+		goto out;
+
+	warn("halt worker");
+	report_error(info);
+	goto connect;
+}
+
+/*
+ * Here is our nonblocking worker daemon.  It handles all events other
+ * than incoming socket traffic.  At the moment, its only job is to
+ * send read release messages that can't be sent directly from the read
+ * end_io function, which executes in interrupt context.  But soon its
+ * duties will be expanded to include submitting IO that was blocked
+ * because no server pipe is connected yet, or something broke the
+ * pipe.  It may also have to resubmit some server queries, if the
+ * server dies for some reason and a new one is incarnated to take its
+ * place.  We also want to check for timed-out queries here.  Sure, we
+ * have heartbeating in the cluster, but why not have the guy who knows
+ * what to expect do the checking?  When we do detect timeouts, we will
+ * punt the complaint upstairs using some interface that hasn't been
+ * invented yet, because nobody has thought too deeply about what you
+ * need to do, to detect faults really quickly and reliably.
+ *
+ * We throttle this daemon using a counting semaphore: each up on the
+ * semaphore causes the daemon to loop through its polling sequence
+ * once.  So we make sure we up the daemon's semaphore every time we
+ * queue an event.  The daemon may well process more than one event per
+ * cycle (we want that, actually, because then it can do some, e.g.,
+ * message batching if it wants to) and will therefore end up looping
+ * a few times without doing any work.  This is harmless, and much much
+ * less nasty than missing an event.  When there are no pending events,
+ * the daemon sleeps peacefully.  Killing the daemon is easy, we just
+ * pull down the running flag and up the work semaphore, which causes
+ * our faithful worker to drop out the bottom.
+ */
+void upload_locks(struct devinfo *info)
+{
+	unsigned long irqflags;
+	struct hook *hook;
+	struct list_head *entry, *tmp;
+
+	spin_lock_irqsave(&info->end_io_lock, irqflags);
+	info->dont_switch_lists = 1;
+	while(!list_empty(&info->releases)){
+		entry = info->releases.prev;
+		hook = list_entry(entry, struct hook, list);
+		list_del(entry);
+		kmem_cache_free(end_io_cache, hook);
+	}
+	spin_unlock_irqrestore(&info->end_io_lock, irqflags);
+	list_for_each_safe(entry, tmp, &info->locked){
+		chunk_t chunk;
+
+		hook = list_entry(entry, struct hook, list);
+		spin_lock_irqsave(&info->end_io_lock, irqflags);
+		if (hook->old_end_io == NULL){
+			list_del(entry);
+			kmem_cache_free(end_io_cache, hook);
+			spin_unlock_irqrestore(&info->end_io_lock, irqflags);
+			continue;
+		}
+		spin_unlock_irqrestore(&info->end_io_lock, irqflags);
+		chunk = hook->sector >> info->chunkshift;
+		outbead(info->sock, UPLOAD_LOCK, struct rw_request1, .count = 1, .ranges[0].chunk = chunk, .ranges[0].chunks = 1);
+	}
+	outbead(info->sock, FINISH_UPLOAD_LOCK, struct {});
+	spin_lock_irqsave(&info->end_io_lock, irqflags);
+	list_for_each_safe(entry, tmp, &info->locked){
+		hook = list_entry(entry, struct hook, list);
+		if (hook->old_end_io == NULL)
+			list_move(&hook->list, &info->releases);
+	}
+	info->dont_switch_lists = 0;
+	spin_unlock_irqrestore(&info->end_io_lock, irqflags);
+}
+
+static void requeue_queries(struct devinfo *info)
+{
+	unsigned i;
+
+	trace(show_pending(info);)
+	spin_lock(&info->pending_lock);
+	warn("");
+	for (i = 0; i < NUM_BUCKETS; i++) {
+		struct list_head *bucket = info->pending + i;
+
+		while (!list_empty(bucket)) {
+			struct list_head *entry = bucket->next;
+			struct pending *pending = list_entry(entry, struct pending, list);
+			trace_on(warn("requeue %u:%Lx", pending->id, pending->chunk);)
+
+			list_move(entry, &info->queries);
+			up(&info->more_work_sem);
+		}
+	}
+	spin_unlock(&info->pending_lock);
+	trace(show_pending(info);)
+}
+
+static int worker(struct dm_target *target)
+{
+	struct devinfo *info = target->private;
+	struct task_struct *task = current;
+	int err;
+
+	strcpy(task->comm, "ddsnap-worker");
+	trace(warn("Worker thread started, pid=%i", current->pid);)
+	down(&info->exit1_sem);
+	goto recover; /* just for now we'll always upload locks, even on fresh start */
+restart:
+	while (worker_running(info)) {
+		unsigned long irqflags;
+		down(&info->more_work_sem);
+
+		/* Send message for each pending request. */
+		spin_lock(&info->pending_lock);
+		while (!list_empty(&info->queries) && worker_running(info)) {
+			struct list_head *entry = info->queries.prev;
+			struct pending *pending = list_entry(entry, struct pending, list);
+
+			list_del(entry);
+			list_add(&pending->list, info->pending + hash_pending(pending->id));
+			spin_unlock(&info->pending_lock);
+			trace(show_pending(info);)
+
+			down(&info->server_out_sem);
+			trace(warn("Server query [%Lx/%x]", pending->chunk, pending->chunks);)
+			if ((err = outbead(info->sock,
+				bio_data_dir(pending->bio) == WRITE? QUERY_WRITE: QUERY_SNAPSHOT_READ,
+				struct rw_request1,
+					.id = pending->id, .count = 1,
+					.ranges[0].chunk = pending->chunk,
+					.ranges[0].chunks = pending->chunks)))
+				goto report;
+			up(&info->server_out_sem);
+			spin_lock(&info->pending_lock);
+		}
+		spin_unlock(&info->pending_lock);
+
+		/* Send message for each pending read release. */
+		spin_lock_irqsave(&info->end_io_lock, irqflags);
+		while (!list_empty(&info->releases) && worker_running(info)) {
+			struct list_head *entry = info->releases.prev;
+			struct hook *hook = list_entry(entry, struct hook, list);
+			chunk_t chunk = hook->sector >> info->chunkshift;
+
+			list_del(entry);
+			spin_unlock_irqrestore(&info->end_io_lock, irqflags);
+			trace(warn("release sector %Lx, chunk %Lx", (long long)hook->sector, chunk);)
+			kmem_cache_free(end_io_cache, hook);
+			down(&info->server_out_sem);
+			if ((err = outbead(info->sock, FINISH_SNAPSHOT_READ, struct rw_request1,
+				.count = 1, .ranges[0].chunk = chunk, .ranges[0].chunks = 1)))
+				goto report;
+			up(&info->server_out_sem);
+			spin_lock_irqsave(&info->end_io_lock, irqflags);
+		}
+		spin_unlock_irqrestore(&info->end_io_lock, irqflags);
+
+		trace(warn("Yowza! More work?");)
+	}
+	if ((info->flags & RECOVER_FLAG)) {
+		down(&info->server_out_sem);
+		up(&info->more_work_sem);
+		goto recover;
+	}
+finish:
+	up(&info->exit1_sem); /* !!! crashes if module unloaded before ret executes */
+	trace_on(warn("%s exiting", task->comm);)
+	return 0;
+
+report:
+	warn("worker socket error %i", err);
+	report_error(info);
+recover:
+	trace_on(warn("worker recovering");)
+	down(&info->recover_sem);
+	if ((info->flags & FINISH_FLAG))
+		goto finish;
+	if (is_snapshot(info))
+		upload_locks(info);
+	requeue_queries(info);
+	trace_on(warn("worker resuming");)
+
+	info->flags &= ~(RECOVER_FLAG|(1 << REPORT_BIT));
+	up(&info->recover_sem);
+	goto restart;
+}
+
+/*
+ * Yikes, a third daemon, that makes four including the user space
+ * monitor.  This daemon proliferation is due to not using poll, which
+ * we should fix at some point.  Or maybe we should wait for aio to
+ * work properly for sockets, and use that instead.  Either way, we
+ * can combine the two socket-waiting daemons into one, which will look
+ * nicer in ps.  Practically speaking, it doesn't matter a whole lot
+ * though, if we just stay lazy and have too many daemons.
+ *
+ * At least, combine this code with incoming, with just the switches
+ * different.
+ */
+static int control(struct dm_target *target)
+{
+	struct task_struct *task = current;
+	struct devinfo *info = target->private;
+	struct messagebuf message; // !!! have a buffer in the target->info
+	struct file *sock;
+	int err, length;
+
+	strcpy(task->comm, "ddsnap-control");
+	trace(warn("Control thread started, pid=%i", current->pid);)
+	sock = info->control_socket;
+	trace(warn("got socket %p", sock);)
+
+	down(&info->exit3_sem);
+	while (running(info)) {
+		trace(warn("wait message");)
+		if ((err = readpipe(sock, &message.head, sizeof(message.head))))
+			goto socket_error;
+		trace(warn("got message header code %x", message.head.code);)
+		length = message.head.length;
+		if (length > maxbody)
+			goto message_too_long;
+		trace(warn("%x/%u", message.head.code, length);)
+		if ((err = readpipe(sock, &message.body, length)))
+			goto socket_error;
+	
+		switch (message.head.code) {
+		case SET_IDENTITY:
+			info->id = ((struct set_id *)message.body)->id;
+			warn("id set: %Lu", info->id);
+			break;
+		case CONNECT_SERVER: {
+			unsigned len = 4;
+			char bogus[len];
+			int sock_fd = get_unused_fd(), fd;
+
+			if (sock_fd < 0) {
+				warn("Can't get fd, error %i", sock_fd);
+				break;
+			}
+			fd_install(sock_fd, sock);
+			if ((fd = recv_fd(sock_fd, bogus, &len)) < 0) {
+				warn("recv_fd failed, error %i", fd);
+				put_unused_fd(sock_fd);
+				break;
+			}
+			trace(warn("Received socket %i", fd);)
+			info->sock = fget(fd);
+			current->files->fd[fd] = NULL; /* this is sooo hokey */
+			put_unused_fd(sock_fd);
+			sys_close(fd);
+			up(&info->server_in_sem);
+			outbead(info->sock, IDENTIFY, struct identify, .id = info->id, .snap = info->snap);
+			up(&info->recover_sem); /* worker uploads locks now */
+			break;
+		}
+		default: 
+			warn("Unknown message %x", message.head.code);
+			continue;
+		}
+	}
+out:
+	up(&info->exit3_sem); /* !!! will crash if module unloaded before ret executes */
+	warn("%s exiting", task->comm);
+	return 0;
+message_too_long:
+	warn("message %x too long (%u bytes)", message.head.code, message.head.length);
+	goto out;
+socket_error:
+	warn("socket error %i", err);
+	goto out;
+}
+
+/*
+ * This is the device mapper mapping method, which does one of three things:
+ * (1) tells device mapper to go ahead and submit the request with a default
+ * identity mapping (return 1) (2) tells device mapper to forget about the
+ * request (return 0), goes off and does its own thing, or (3) on a bad
+ * day, tells device mapper to fail the IO (return negative errnum).
+ *
+ * This is pretty simple: we just hand any origin reads back to device mapper
+ * after filling in the origin device.  Then, we check the cache to see if
+ * if conditions are right to map the request locally, otherwise we need help
+ * from the server, so we remember the request in the pending hash and send
+ * off the appropriate server query.
+ *
+ * To make this a little more interesting, our server connection may be broken
+ * at the moment, or may not have been established yet, in which case we have
+ * to defer the request until the server becomes available.
+ */
+static int ddsnap_map(struct dm_target *target, struct bio *bio, union map_info *context)
+{
+	struct devinfo *info = target->private;
+	struct pending *pending;
+	chunk_t chunk;
+	unsigned id;
+
+	bio->bi_bdev = info->orgdev->bdev;
+	if (bio_data_dir(bio) == READ && !is_snapshot(info))
+		return 1;
+
+	chunk = bio->bi_sector >> info->chunkshift;
+	trace(warn("map %Lx/%x, chunk %Lx", (long long)bio->bi_sector, bio->bi_size, chunk);)
+	assert(bio->bi_size <= 1 << info->chunksize_bits);
+#ifdef CACHE
+	if (is_snapshot(info)) { // !!! use page cache for both
+		struct page *page;
+		u64 *exception = snap_map_cachep(info->inode->i_mapping, chunk, &page);
+	
+		if (!exception) {
+			printk("Failed to get a page for sector %ld\n", bio->bi_sector);
+			return -1;
+		}
+
+		u64 exp_chunk = *exception;
+		UnlockPage(page);
+		if (exp_chunk) {
+			bio->bi_sector = bio->bi_sector + ((exp_chunk - chunk) << info->chunkshift);
+			return 1;
+		}
+	} else {
+		if (info->shared_bitmap && get_unshared_bit(info, chunk))
+			return 1;
+	}
+#endif
+	id = info->nextid;
+	info->nextid = (id + 1) & ~(-1 << ID_BITS);
+	pending = kmem_cache_alloc(pending_cache, GFP_NOIO|__GFP_NOFAIL);
+	*pending = (struct pending){ .id = id, .bio = bio, .chunk = chunk, .chunks = 1 };
+	spin_lock(&info->pending_lock);
+	list_add(&pending->list, &info->queries);
+	spin_unlock(&info->pending_lock);
+	up(&info->more_work_sem);
+	return 0;
+}
+
+/*
+ * Carefully crafted not to care about how far we got in the process
+ * of instantiating our client.  As such, it serves both for error
+ * abort and device unload destruction.  We have to scour our little
+ * world for resources and give them all back, including any pending
+ * requests, context structures and daemons.  The latter have to be
+ * convince to exit on demand, and we must be sure they have exited,
+ * so we synchronize that with semaphores.  This isn't 100% foolproof'
+ * there is still the possibility that the destructor could gain
+ * control between the time a daemon ups its exit semaphore and when
+ * it has actually returned to its caller.  In that case, the module
+ * could be unloaded and the exiting thread will segfault.  This is
+ * a basic flaw in Linux that I hope to get around to fixing at some
+ * point, one way or another.
+ */
+static int shutdown_socket(struct file *socket)
+{
+	struct socket *sock = SOCKET_I(socket->f_dentry->d_inode);
+	return sock->ops->shutdown(sock, RCV_SHUTDOWN);
+}
+
+static void ddsnap_destroy(struct dm_target *target)
+{
+	struct devinfo *info = target->private;
+	int err; /* I have no mouth but I must scream */
+
+	trace(warn("%p", target);)
+	if (!info)
+		return;
+
+	/* Unblock helper threads */
+	info->flags |= FINISH_FLAG;
+	up(&info->server_in_sem); // unblock incoming thread
+	up(&info->server_out_sem); // unblock io request threads
+	up(&info->recover_sem); // unblock worker recovery
+
+	if (info->sock && (err = shutdown_socket(info->sock)))
+		warn("server socket shutdown error %i", err);
+	if (info->sock && (err = shutdown_socket(info->control_socket)))
+		warn("control socket shutdown error %i", err);
+
+	up(&info->more_work_sem);
+
+	// !!! wrong! the thread might be just starting, think about this some more
+	// ah, don't let ddsnap_destroy run while ddsnap_create is spawning threads
+	down(&info->exit1_sem);
+	warn("thread 1 exited");
+	down(&info->exit2_sem);
+	warn("thread 2 exited");
+	down(&info->exit3_sem);
+	warn("thread 3 exited");
+
+	if (info->sock)
+		fput(info->sock);
+	if (info->inode)
+		iput(info->inode);
+	if (info->shared_bitmap)
+		vfree(info->shared_bitmap);
+	if (info->snapdev)
+		dm_put_device(target, info->snapdev);
+	if (info->orgdev)
+		dm_put_device(target, info->orgdev);
+	kfree(info);
+}
+
+/*
+ * Woohoo, we are going to instantiate a new cluster snapshot virtual
+ * device, what fun.
+ */
+static int get_control_socket(char *sockname)
+{
+	mm_segment_t oldseg = get_fs();
+	struct sockaddr_un addr = { .sun_family = AF_UNIX };
+	int addr_len = sizeof(addr) - sizeof(addr.sun_path) + strlen(sockname); // !!! check too long
+	int sock = sys_socket(AF_UNIX, SOCK_STREAM, 0), err = 0;
+
+	trace(warn("Connect to control socket %s", sockname);)
+	if (sock <= 0)
+		return sock;
+	strncpy(addr.sun_path, sockname, sizeof(addr.sun_path));
+	if (sockname[0] == '@')
+		addr.sun_path[0] = 0;
+
+	set_fs(get_ds());
+	while ((err = sys_connect(sock, (struct sockaddr *)&addr, addr_len)) == -ECONNREFUSED)
+		break;
+//		yield();
+	set_fs(oldseg);
+
+	return err? err: sock;
+}
+
+/*
+ * Round up to nearest 2**k boundary
+ * !!! lose this
+ */
+static inline ulong round_up(ulong n, ulong size)
+{
+	return (n + size - 1) & ~(size - 1);
+}
+
+static int ddsnap_create(struct dm_target *target, unsigned argc, char **argv)
+{
+	u64 chunksize_bits = 12; // !!! when chunksize isn't always 4K, have to move all this to identify reply handler
+	struct devinfo *info;
+	int err, i, snap, flags = 0;
+	char *error;
+#ifdef CACHE
+	unsigned bm_size;
+#endif
+
+	error = "ddsnap usage: orgdev snapdev sockname snapnum";
+	err = -EINVAL;
+	if (argc != 4)
+		goto eek;
+
+	snap = simple_strtol(argv[3], NULL, 0);
+	if (snap >= 0)
+		flags |= IS_SNAP_FLAG;
+
+	err = -ENOMEM;
+	error = "can't get kernel memory";
+	if (!(info = kmalloc(sizeof(struct devinfo), GFP_KERNEL)))
+		goto eek;
+
+	*info = (struct devinfo){ 
+		.flags = flags, .snap = snap,
+		.chunksize_bits = chunksize_bits,
+		.chunkshift = chunksize_bits - SECTOR_SHIFT};
+	target->private = info;
+	sema_init(&info->server_in_sem, 0);
+	sema_init(&info->server_out_sem, 0);
+	sema_init(&info->recover_sem, 0);
+	sema_init(&info->exit1_sem, 1);
+	sema_init(&info->exit2_sem, 1);
+	sema_init(&info->exit3_sem, 1);
+	sema_init(&info->more_work_sem, 0);
+	spin_lock_init(&info->pending_lock);
+	spin_lock_init(&info->end_io_lock);
+	INIT_LIST_HEAD(&info->queries);
+	INIT_LIST_HEAD(&info->releases);
+	INIT_LIST_HEAD(&info->locked);
+	for (i = 0; i < NUM_BUCKETS; i++)
+		INIT_LIST_HEAD(&info->pending[i]);
+
+	error = "Can't get snapshot device";
+	if ((err = dm_get_device(target, argv[0], 0, target->len, dm_table_get_mode(target->table), &info->snapdev)))
+		goto eek;
+	error = "Can't get origin device";
+	if ((err = dm_get_device(target, argv[1], 0, target->len, dm_table_get_mode(target->table), &info->orgdev)))
+		goto eek;
+	error = "Can't connect control socket";
+	if ((err = get_control_socket(argv[2])) < 0)
+		goto eek;
+	info->control_socket = fget(err);
+	sys_close(err);
+
+#ifdef CACHE
+	bm_size = round_up((target->len  + 7) >> (chunksize_bits + 3), sizeof(u32)); // !!! wrong
+	error = "Can't allocate bitmap for origin";
+	if (!(info->shared_bitmap = vmalloc(bm_size)))
+		goto eek;
+	memset(info->shared_bitmap, 0, bm_size);
+	if (!(info->inode = new_inode(snapshot_super)))
+		goto eek;
+#endif
+
+	error = "Can't start daemon";
+	if ((err = kernel_thread((void *)incoming, target, CLONE_KERNEL)) < 0)
+		goto eek;
+	if ((err = kernel_thread((void *)worker, target, CLONE_KERNEL)) < 0)
+		goto eek;
+	if ((err = kernel_thread((void *)control, target, CLONE_KERNEL)) < 0)
+		goto eek;
+	warn("Created snapshot device origin=%s snapstore=%s socket=%s snapshot=%i", argv[0], argv[1], argv[2], snap);
+	target->split_io = 1 << info->chunkshift; // !!! lose this as soon as possible
+	return 0;
+
+eek:	warn("Virtual device create error %i: %s!", err, error);
+	ddsnap_destroy(target);
+	target->error = error;
+	return err;
+
+	{ void *useme = show_pending; useme = useme; }
+}
+
+/* Is this actually useful?  It's really trying to be a message */
+
+static int ddsnap_status(struct dm_target *target, status_type_t type, char *result, unsigned int maxlen)
+{
+	char orgbuffer[32];
+	char snapbuffer[32];
+	struct devinfo *info = target->private;
+
+	switch (type) {
+	case STATUSTYPE_INFO:
+		result[0] = '\0';
+		break;
+
+	case STATUSTYPE_TABLE:
+		format_dev_t(orgbuffer, info->orgdev->bdev->bd_dev);
+		format_dev_t(snapbuffer, info->snapdev->bdev->bd_dev);
+		snprintf(result, maxlen, "%s %s %u",
+			 orgbuffer, snapbuffer, 1 << info->chunksize_bits);
+		break;
+	}
+
+	return 0;
+}
+
+static struct target_type ddsnap = {
+	.name = "ddsnap",
+	.version = {0, 0, 0},
+	.module = THIS_MODULE,
+	.ctr = ddsnap_create,
+	.dtr = ddsnap_destroy,
+	.map = ddsnap_map,
+	.status = ddsnap_status,
+};
+
+int __init dm_ddsnap_init(void)
+{
+	int err = -ENOMEM;
+	char *what = "Cache create";
+	if (!(pending_cache = kmem_cache_create("ddsnap-pending",
+		sizeof(struct pending), __alignof__(struct pending), 0, NULL, NULL)))
+		goto bad1;
+	if (!(end_io_cache = kmem_cache_create("ddsnap-endio",
+		sizeof(struct hook), __alignof__(struct hook), 0, NULL, NULL)))
+		goto bad2;
+	what = "register";
+	if ((err = dm_register_target(&ddsnap)))
+		goto bad3;
+#ifdef CACHE
+	err = -ENOMEM;
+	what = "create snapshot superblock";
+	if (!(snapshot_super = alloc_super()))
+		goto bad4;
+#endif
+	return 0;
+
+#ifdef CACHE
+bad4:
+	dm_unregister_target(&ddsnap);
+#endif
+bad3:
+	kmem_cache_destroy(end_io_cache);
+bad2:
+	kmem_cache_destroy(pending_cache);
+bad1:
+	DMERR("%s failed\n", what);
+	return err;
+}
+
+void dm_ddsnap_exit(void)
+{
+	int err;
+	if ((err = dm_unregister_target(&ddsnap)))
+		DMERR("Snapshot unregister failed %d", err);
+	if (pending_cache)
+		kmem_cache_destroy(pending_cache);
+	if (end_io_cache)
+		kmem_cache_destroy(end_io_cache);
+	kfree(snapshot_super);
+}
+
+module_init(dm_ddsnap_init);
+module_exit(dm_ddsnap_exit);
+
+MODULE_LICENSE("GPL")
diff -up --recursive 2.6.11.3.clean/drivers/md/dm-ddsnap.h 2.6.11.3/drivers/md/dm-ddsnap.h
--- 2.6.11.3.clean/drivers/md/dm-ddsnap.h	2005-05-31 20:21:10.000000000 -0400
+++ 2.6.11.3/drivers/md/dm-ddsnap.h	2005-05-31 18:10:10.000000000 -0400
@@ -0,0 +1,94 @@
+#define PACKED __attribute__ ((packed))
+#define MAGIC  0xadbe
+
+struct head { uint32_t code; uint32_t length; } PACKED;
+
+enum csnap_codes
+{
+	REPLY_ERROR = 0xbead0000,
+	IDENTIFY,
+	REPLY_IDENTIFY,
+	QUERY_WRITE,
+	REPLY_ORIGIN_WRITE,
+	REPLY_SNAPSHOT_WRITE,
+	QUERY_SNAPSHOT_READ,
+	REPLY_SNAPSHOT_READ,
+	REPLY_SNAPSHOT_READ_ORIGIN,
+	FINISH_SNAPSHOT_READ,
+	CREATE_SNAPSHOT,
+	REPLY_CREATE_SNAPSHOT,
+	DELETE_SNAPSHOT,
+	REPLY_DELETE_SNAPSHOT,
+	DUMP_TREE,
+	INITIALIZE_SNAPSTORE,
+	NEED_SERVER,
+	CONNECT_SERVER,
+	REPLY_CONNECT_SERVER,
+	CONTROL_SOCKET,
+	SERVER_READY,
+	START_SERVER,
+	SHUTDOWN_SERVER,
+	SET_IDENTITY,
+	UPLOAD_LOCK,
+	FINISH_UPLOAD_LOCK,
+	NEED_CLIENTS,
+	UPLOAD_CLIENT_ID,
+	FINISH_UPLOAD_CLIENT_ID,
+	REMOVE_CLIENT_IDS,
+	LIST_SNAPSHOTS,
+	SNAPSHOT_LIST,
+};
+
+struct match_id { uint64_t id; uint64_t mask; } PACKED;
+struct set_id { uint64_t id; } PACKED;
+struct identify { uint64_t id; int32_t snap; } PACKED;
+struct create_snapshot { uint32_t snap; } PACKED;
+struct snapinfo { uint64_t snap; int8_t prio; char zero[3]; uint64_t ctime; } PACKED;
+struct snaplist { uint32_t count; struct snapinfo snapshots[]; } PACKED;
+
+typedef uint16_t shortcount; /* !!! what is this all about */
+
+struct rw_request
+{
+	uint16_t id;
+	shortcount count;
+	struct chunk_range
+	{
+		uint64_t chunk;
+		shortcount chunks;
+	} PACKED ranges[];
+} PACKED;
+
+/* !!! can there be only one flavor of me please */
+struct rw_request1
+{
+	uint16_t id;
+	shortcount count;
+	struct chunk_range PACKED ranges[1];
+} PACKED;
+
+/* decruft me... !!! */
+#define maxbody 500
+struct rwmessage { struct head head; struct rw_request body; };
+struct messagebuf { struct head head; char body[maxbody]; };
+/* ...decruft me */
+
+/* The endian conversions that libc forgot */
+
+static inline uint64_t ntohll(uint64_t n)
+{
+#if __BYTE_ORDER == __LITTLE_ENDIAN
+	return (((uint64_t)ntohl(n)) << 32) | ntohl(n >> 32);
+#else
+	return n; 
+#endif
+}
+
+static inline uint64_t htonll(uint64_t n)
+{
+#if __BYTE_ORDER == __LITTLE_ENDIAN
+	return (((uint64_t)htonl(n)) << 32) | htonl(n >> 32);
+#else
+	return n; 
+#endif
+}
diff -up --recursive 2.6.11.3.clean/drivers/md/dm-loop.c 2.6.11.3/drivers/md/dm-loop.c
--- 2.6.11.3.clean/drivers/md/dm-loop.c	2005-04-10 16:32:47.000000000 -0400
+++ 2.6.11.3/drivers/md/dm-loop.c	2005-04-12 16:10:20.000000000 -0400
@@ -0,0 +1,254 @@
+#include <linux/fs.h>
+#include <linux/slab.h>
+#include <linux/file.h>
+#include <linux/bio.h>
+#include <linux/module.h>
+#include "dm.h"
+
+#define warn(string, args...) do { printk("%s: " string "\n", __func__, ##args); } while (0)
+#define trace_on(args) args
+#define trace_off(args)
+
+#define trace trace_off
+
+#define SECTOR_SHIFT 9
+#define MAX_MEMBERS 1
+#define FINISH_FLAG 1
+#define DIRECT 1
+
+static int io(struct file *file, void *buffer, unsigned count, loff_t pos,
+	ssize_t (*op)(struct kiocb *, char *, size_t, loff_t), int mode)
+{
+	struct kiocb iocb;
+	mm_segment_t oldseg = get_fs();
+
+	if (!(file->f_mode & mode))
+		return -EPERM;
+	if (!op)
+		return -EINVAL;
+
+	init_sync_kiocb(&iocb, file);
+	iocb.ki_pos = pos;
+	set_fs(get_ds());
+	while (count) {
+		int bytes = (*op)(&iocb, buffer, count, iocb.ki_pos);
+		if (bytes <= 0)
+			return bytes? bytes: -EIO;
+		BUG_ON(bytes > count);
+		buffer += bytes;
+		count -= bytes;
+	}
+	set_fs(oldseg);
+	return 0;
+}
+
+static int pread(struct file *file, void *buffer, unsigned count, loff_t pos)
+{
+	return io(file, buffer, count, pos, file->f_op->aio_read, FMODE_READ);
+}
+
+static int pwrite(struct file *file, void *buffer, unsigned count, loff_t pos)
+{
+	return io(file, buffer, count, pos, (void *)file->f_op->aio_write, FMODE_WRITE);
+}
+
+struct devinfo {
+	spinlock_t defer_lock;
+	unsigned flags, members;
+	struct semaphore more_work_sem, work_exit_sem;
+	struct dm_dev *member[MAX_MEMBERS];
+	struct list_head defer;
+	struct file *file;
+	char bounce[PAGE_CACHE_SIZE];
+};
+
+struct defer { struct list_head list; struct bio *bio; };
+static kmem_cache_t *defer_cache;
+
+static int worker(struct dm_target *target)
+{
+	struct devinfo *info = target->private;
+
+	daemonize("dmloop-worker");
+	down(&info->work_exit_sem);
+	while (1) {
+		down(&info->more_work_sem);
+
+		if ((info->flags & FINISH_FLAG)) {
+			up(&info->work_exit_sem); // !!! crashes if module unloaded before ret executes
+			return 0;
+		}
+
+		spin_lock(&info->defer_lock);
+		while (!list_empty(&info->defer)) {
+			struct defer *defer = list_entry(info->defer.next, struct defer, list);
+			struct bio *bio = defer->bio;
+			struct bio_vec *vec = bio->bi_io_vec;
+			loff_t start = bio->bi_sector << SECTOR_SHIFT, pos = start;
+			int i, err = 0;
+
+			for (i = 0; i < bio->bi_vcnt; i++, vec++) {
+				char *buf = info->bounce;
+				struct page *page = vec->bv_page;
+				unsigned off = vec->bv_offset, len = vec->bv_len;
+				void *ppage;
+
+				trace(warn("%s %Lx/%x", mode == FMODE_READ? "read": "write", (long long)pos, len);)
+				if (bio_data_dir(bio) == READ) {
+					if ((err = pread(info->file, buf, len, pos)))
+						break;
+					ppage = kmap_atomic(page, KM_USER0);
+					memcpy(ppage + off, buf, len);
+					kunmap_atomic(ppage, KM_USER0);
+				} else {
+					ppage = kmap_atomic(page, KM_USER0);
+					memcpy(buf, ppage + off, len);
+					kunmap_atomic(ppage, KM_USER0);
+					if ((err = pwrite(info->file, buf, len, pos)))
+						break;
+				}
+				pos += len;
+			}
+			if (err) warn("R/W error %i", err);
+			bio_endio(bio, pos - start, err);
+			list_del(info->defer.next);
+			kmem_cache_free(defer_cache, defer);
+		}
+		spin_unlock(&info->defer_lock);
+	}
+}
+
+static int dev_map(struct dm_target *target, struct bio *bio, union map_info *context)
+{
+	struct devinfo *info = target->private;
+	struct defer *defer = kmem_cache_alloc(defer_cache, GFP_NOIO|__GFP_NOFAIL);
+
+	trace(warn("map %Lx/%x", (long long)bio->bi_sector, bio->bi_size);)
+	*defer = (struct defer){ .bio = bio };
+	spin_lock(&info->defer_lock);
+	list_add_tail(&defer->list, &info->defer);
+	spin_unlock(&info->defer_lock);
+	up(&info->more_work_sem);
+	return 0;
+}
+
+static int dev_status(struct dm_target *target, status_type_t type, char *result, unsigned maxlen)
+{
+	return result[0] = '\0';
+}
+
+static void dev_destroy(struct dm_target *target)
+{
+	struct devinfo *info = target->private;
+	int i;
+
+	trace(warn("");)
+	if (!info)
+		return;
+
+	info->flags |= FINISH_FLAG;
+	up(&info->more_work_sem); /* unblock it */
+
+	// !!! wrong! the thread might be just starting, think about this some more
+	// ah, don't let dev_destroy run while dev_create is spawning threads
+	down(&info->work_exit_sem);
+	warn("thread 1 exited");
+
+	for (i = 0; i < info->members; i++)
+		if (info->member[i])
+			dm_put_device(target, info->member[i]);
+
+	if (info->file)
+		fput(info->file);
+
+	kfree(info);
+}
+
+static int dev_create(struct dm_target *target, unsigned argc, char **argv)
+{
+	struct devinfo *info;
+	char *error;
+	int err, mode = DIRECT? O_DIRECT: 0;
+
+	err = -EINVAL;
+	error = "loop usage: device file";
+	if (argc > 1)
+		goto eek;
+
+	err = -ENOMEM;
+	error = "Can't get kernel memory";
+	if (!(info = kmalloc(sizeof(struct devinfo), GFP_KERNEL)))
+		goto eek;
+
+	*info = (struct devinfo){ .members = 1 };
+	sema_init(&info->work_exit_sem, 1);
+	sema_init(&info->more_work_sem, 0);
+	spin_lock_init(&info->defer_lock);
+	INIT_LIST_HEAD(&info->defer);
+	target->private = info;
+
+	error = "Can't open loop file";
+	if (IS_ERR(info->file = filp_open(argv[0], O_RDWR, mode))) { // perms??
+		err = PTR_ERR(info->file);
+		goto eek;
+	}
+
+	err = -EPERM;
+	if (!info->file)
+		goto eek;
+
+	err = -ENOMEM;
+	error = "Can't start daemon";
+	if ((err = kernel_thread((void *)worker, target, CLONE_KERNEL)) < 0)
+		goto eek;
+
+	trace(warn("Created loop device on fd %s", argv[0]);)
+	return 0;
+eek:
+	warn("Error %i creating device, %s!", err, error);
+	dev_destroy(target);
+	target->error = error;
+	return err;
+}
+
+static struct target_type loop = {
+	.name = "loop",
+	.version = {0, 0, 0},
+	.module = THIS_MODULE,
+	.ctr = dev_create,
+	.dtr = dev_destroy,
+	.map = dev_map,
+	.status = dev_status,
+};
+
+int __init dm_loop_init(void)
+{
+	char *what = "Device registration";
+	int err = dm_register_target(&loop);
+
+	if (err)
+		goto bad1;
+	err = -ENOMEM;
+	what = "Cache create";
+	if (!(defer_cache = kmem_cache_create("dmloop-defer",
+		sizeof(struct defer), __alignof__(struct defer), 0, NULL, NULL)))
+		goto bad2;
+	return 0;
+bad2:
+	dm_unregister_target(&loop);
+bad1:
+	DMERR("%s failed\n", what);
+	return err;
+}
+
+void dm_loop_exit(void)
+{
+	int err;
+	if ((err = dm_unregister_target(&loop)))
+		DMERR("Unregister failed %d", err);
+	if (defer_cache)
+		kmem_cache_destroy(defer_cache);
+}
+
+module_init(dm_loop_init);
+module_exit(dm_loop_exit);
diff -up --recursive 2.6.11.3.clean/fs/Kconfig 2.6.11.3/fs/Kconfig
--- 2.6.11.3.clean/fs/Kconfig	2005-03-13 01:44:28.000000000 -0500
+++ 2.6.11.3/fs/Kconfig	2005-04-12 13:43:00.000000000 -0400
@@ -295,13 +295,13 @@ config JFS_STATISTICS
 	  to be made available to the user in the /proc/fs/jfs/ directory.
 
 config FS_POSIX_ACL
-# Posix ACL utility routines (for now, only ext2/ext3/jfs/reiserfs)
+# Posix ACL utility routines (for now, only ext2/ext3/jfs/reiserfs/GFS)
 #
 # NOTE: you can implement Posix ACLs without these helpers (XFS does).
 # 	Never use this symbol for ifdefs.
 #
 	bool
-	depends on EXT2_FS_POSIX_ACL || EXT3_FS_POSIX_ACL || JFS_POSIX_ACL || REISERFS_FS_POSIX_ACL || NFSD_V4
+	depends on EXT2_FS_POSIX_ACL || EXT3_FS_POSIX_ACL || JFS_POSIX_ACL || REISERFS_FS_POSIX_ACL || NFSD_V4 || GFS_FS
 	default y
 
 source "fs/xfs/Kconfig"
@@ -1714,6 +1714,46 @@ config AFS_FS
 config RXRPC
 	tristate
 
+config LOCK_HARNESS
+	tristate "GFS Lock Harness"
+	help
+	  The module that connects GFS to the modules that provide
+	  locking for GFS.
+
+	  If you want to use GFS (a cluster filesystem) say Y here.
+
+config GFS_FS
+	tristate "GFS file system support"
+	depends on LOCK_HARNESS
+	help
+	  A cluster filesystem.
+
+	  Allows a cluster of computers to simultaneously use a block device
+	  that is shared between them (with FC, iSCSI, NBD, etc...).  GFS reads
+	  and writes to the block device like a local filesystem, but also uses
+	  a lock module to allow the computers coordinate their I/O so
+	  filesystem consistency is maintained.  One of the nifty features of
+	  GFS is perfect consistency -- changes made to the filesystem on one
+	  machine show up immediately on all other machines in the cluster.
+
+config LOCK_NOLOCK
+	tristate "Lock Nolock"
+	depends on LOCK_HARNESS
+	help
+	  A "fake" lock module that allows GFS to run as a local filesystem.
+
+config LOCK_DLM
+	tristate "Lock DLM"
+	depends on LOCK_HARNESS
+	help
+	  A lock module that allows GFS to use a Distributed Lock Manager.
+
+config LOCK_GULM
+	tristate "Lock GULM"
+	depends on LOCK_HARNESS
+	help
+	  A lock module that allows GFS to use a Failover Lock Manager.
+
 endmenu
 
 menu "Partition Types"
diff -up --recursive 2.6.11.3.clean/fs/Makefile 2.6.11.3/fs/Makefile
--- 2.6.11.3.clean/fs/Makefile	2005-03-13 01:44:28.000000000 -0500
+++ 2.6.11.3/fs/Makefile	2005-04-12 13:43:00.000000000 -0400
@@ -95,3 +95,5 @@ obj-$(CONFIG_BEFS_FS)		+= befs/
 obj-$(CONFIG_HOSTFS)		+= hostfs/
 obj-$(CONFIG_HPPFS)		+= hppfs/
 obj-$(CONFIG_DEBUG_FS)		+= debugfs/
+obj-$(CONFIG_LOCK_HARNESS)	+= gfs_locking/
+obj-$(CONFIG_GFS_FS)		+= gfs/
diff -up --recursive 2.6.11.3.clean/net/socket.c 2.6.11.3/net/socket.c
--- 2.6.11.3.clean/net/socket.c	2005-03-13 01:44:20.000000000 -0500
+++ 2.6.11.3/net/socket.c	2005-03-29 20:00:42.000000000 -0500
@@ -2072,6 +2072,12 @@ void socket_seq_show(struct seq_file *se
 }
 #endif /* CONFIG_PROC_FS */
 
+/* Cluster devices need these, or better: kernel interfaces */
+
+EXPORT_SYMBOL_GPL(sys_connect);
+EXPORT_SYMBOL_GPL(sys_recvmsg);
+EXPORT_SYMBOL_GPL(sys_socket);
+
 /* ABI emulation layers need these two */
 EXPORT_SYMBOL(move_addr_to_kernel);
 EXPORT_SYMBOL(move_addr_to_user);
